Skip to content
5 changes: 5 additions & 0 deletions .github/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ additionals
SECG
Certicom
RSAES
reprovide
reprovider
reproviding
reprovides
reprovided
2 changes: 0 additions & 2 deletions packages/kad-dht/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ export const second = 1000
export const minute = 60 * second
export const hour = 60 * minute

export const MAX_RECORD_AGE = 36 * hour

export const PROTOCOL = '/ipfs/kad/1.0.0'

/**
Expand Down
40 changes: 31 additions & 9 deletions packages/kad-dht/src/reprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
setMaxListeners(Infinity, this.shutdownController.signal)

this.timeout = setTimeout(() => {
this.cleanUp({
this.processRecords({
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
}).catch(err => {
this.log.error('error running reprovide/cleanup - %e', err)
this.log.error('error running process to reprovide/cleanup - %e', err)
})
}, this.interval)
}
Expand All @@ -118,10 +118,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
* Check all provider records. Delete them if they have expired, reprovide
* them if the provider is us and the expiry is within the reprovide window.
*/
private async cleanUp (options?: AbortOptions): Promise<void> {
private async processRecords (options?: AbortOptions): Promise<void> {
try {
this.safeDispatchEvent('reprovide:start')

this.log('Starting reprovide/cleanup')
// Get all provider entries from the datastore
for await (const entry of this.datastore.query({
prefix: this.datastorePrefix
Expand All @@ -133,17 +133,20 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
const expires = created + this.validity
const now = Date.now()
const expired = now > expires
const isSelf = this.peerId.equals(peerId)

this.log.trace('comparing: %d < %d = %s %s', created, now - this.validity, expired, expired ? '(expired)' : '')
this.log.trace('comparing: %d (now) < %d (expires) = %s %s', now, expires, expired, expired ? '(expired)' : '(valid)')

// delete the record if it has expired
if (expired) {
// delete the record if it has expired and isn't us
// so that if user node is down for a while, we still persist provide intent
if (expired && !isSelf) {
await this.datastore.delete(entry.key, options)
}

// if the provider is us and we are within the reprovide threshold,
// reprovide the record
if (this.peerId.equals(peerId) && (now - expires) < this.reprovideThreshold) {
if (this.shouldReprovide(isSelf, expires)) {
this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold)
this.queueReprovide(cid)
.catch(err => {
this.log.error('could not reprovide %c - %e', cid, err)
Expand All @@ -159,8 +162,9 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
this.safeDispatchEvent('reprovide:end')

if (this.running) {
this.log('queuing next re-provide/cleanup run in %d ms', this.interval)
this.timeout = setTimeout(() => {
this.cleanUp({
this.processRecords({
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
}).catch(err => {
this.log.error('error running re-provide - %e', err)
Expand All @@ -170,6 +174,24 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
}
}

/**
* Determines if a record should be reprovided
*/
private shouldReprovide (isSelf: boolean, expires: number): boolean {
if (!isSelf) {
return false
}
const now = Date.now()

if (expires < now) {
// If the record has already expired, reprovide irrespective of the threshold
return true
}

// if the record is approaching expiration within the reprovide threshold
return expires - now < this.reprovideThreshold
}

private async queueReprovide (cid: CID, options?: AbortOptions): Promise<void> {
if (!this.running) {
return
Expand Down
6 changes: 3 additions & 3 deletions packages/kad-dht/src/rpc/handlers/get-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
import { InvalidMessageError, NotFoundError } from '@libp2p/interface'
import { Libp2pRecord } from '@libp2p/record'
import {
MAX_RECORD_AGE
PROVIDERS_VALIDITY
} from '../../constants.js'
import { MessageType } from '../../message/dht.js'
import { bufferToRecordKey, isPublicKeyKey, fromPublicKeyKey } from '../../utils.js'
Expand Down Expand Up @@ -107,7 +107,7 @@ export class GetValueHandler implements DHTMessageHandler {
* Try to fetch a given record by from the local datastore.
* Returns the record if it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
* - it was received less than `PROVIDERS_VALIDITY` ago.
*/
async _checkLocalDatastore (key: Uint8Array): Promise<Libp2pRecord | undefined> {
this.log('checkLocalDatastore looking for %b', key)
Expand All @@ -129,7 +129,7 @@ export class GetValueHandler implements DHTMessageHandler {

// Check validity: compare time received with max record age
if (record.timeReceived == null ||
Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) {
Date.now() - record.timeReceived.getTime() > PROVIDERS_VALIDITY) {
// If record is bad delete it and return
await this.datastore.delete(dsKey)
return undefined
Expand Down
71 changes: 67 additions & 4 deletions packages/kad-dht/test/reprovider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ describe('reprovider', () => {
contentRouting,
threshold: 100,
validity: 200,
interval: 100,
interval: 200,
operationMetrics: {}
})

await start(reprovider)
})

afterEach(async () => {
Expand All @@ -74,6 +72,8 @@ describe('reprovider', () => {
it('should reprovide', async () => {
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')

await start(reprovider)

await providers.addProvider(cid, components.peerId)

expect(contentRouting.provide).to.have.property('callCount', 0)
Expand All @@ -88,6 +88,8 @@ describe('reprovider', () => {
it('should cancel reprovide', async () => {
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')

await start(reprovider)

await providers.addProvider(cid, components.peerId)

expect(contentRouting.provide).to.have.property('callCount', 0)
Expand All @@ -110,6 +112,9 @@ describe('reprovider', () => {

it('should remove expired provider records', async () => {
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')

await start(reprovider)

await Promise.all([
providers.addProvider(cid, peers[0].peerId),
providers.addProvider(cid, peers[1].peerId)
Expand All @@ -121,9 +126,67 @@ describe('reprovider', () => {
expect(provs[0].toString()).to.be.equal(peers[0].peerId.toString())
expect(provs[1].toString()).to.be.deep.equal(peers[1].peerId.toString())

await delay(400)
await delay(450)

const provsAfter = await providers.getProviders(cid)
expect(provsAfter).to.have.length(0)
})

it('should delete expired records from other peers but preserve own expired records', async () => {
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')

await start(reprovider)

// Add provider records - one from us, one from another peer
await providers.addProvider(cid, components.peerId)
await providers.addProvider(cid, peers[0].peerId)

const provsBefore = await providers.getProviders(cid)
expect(provsBefore).to.have.length(2)

// Wait for records to expire (validity is 200ms)
await delay(250)

// Trigger reprovide cycle to process expired records
await pEvent(reprovider, 'reprovide:start')
await pEvent(reprovider, 'reprovide:end')

const provsAfter = await providers.getProviders(cid)

// Only our own record should remain, other peer's expired record should be deleted
expect(provsAfter).to.have.length(1)
expect(provsAfter[0].toString()).to.equal(components.peerId.toString())
})

describe('shouldReprovide', () => {
it('should return false for non-self providers', () => {
const expires = Date.now() + 50
const result = (reprovider as any).shouldReprovide(false, expires)
expect(result).to.be.false()
})

it('should return true when within reprovide threshold before expiration', () => {
const expires = Date.now() + 50
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})

it('should return true when within reprovide threshold after expiration', () => {
const expires = Date.now() - 50
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})

it('should return false when outside reprovide threshold before expiration', () => {
const expires = Date.now() + 150
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.false()
})

it('should return true when outside reprovide threshold after expiration', () => {
const expires = Date.now() - 150
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})
})
})
63 changes: 63 additions & 0 deletions packages/kad-dht/test/rpc/handlers/get-value.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MemoryDatastore } from 'datastore-core'
import { TypedEventEmitter } from 'main-event'
import Sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { PROVIDERS_VALIDITY } from '../../../src/constants.js'
import { MessageType } from '../../../src/message/dht.js'
import { PeerRouting } from '../../../src/peer-routing/index.js'
import { GetValueHandler } from '../../../src/rpc/handlers/get-value.js'
Expand Down Expand Up @@ -186,4 +187,66 @@ describe('rpc - handlers - GetValue', () => {
expect(response.record).to.not.be.ok()
})
})

describe('record expiration', () => {
it('should return valid record within PROVIDERS_VALIDITY period', async () => {
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')
const record = new Libp2pRecord(key, value, new Date())

await datastore.put(utils.bufferToRecordKey('/dht/record', key), record.serialize())

const msg: Message = {
type: T,
key,
closer: [],
providers: []
}

peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])

const response = await handler.handle(sourcePeer.peerId, msg)

expect(response).to.not.be.undefined()
expect(response.record).to.not.be.undefined()

if (response.record != null) {
const responseRecord = Libp2pRecord.deserialize(response.record)
expect(responseRecord.value).to.equalBytes(value)
}
})

it('should delete and return no record when expired beyond PROVIDERS_VALIDITY', async () => {
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')
// Create record with old timestamp (beyond PROVIDERS_VALIDITY)
const oldTimestamp = new Date(Date.now() - PROVIDERS_VALIDITY - 1000)
const record = new Libp2pRecord(key, value, oldTimestamp)

const dsKey = utils.bufferToRecordKey('/dht/record', key)
await datastore.put(dsKey, record.serialize())

// Verify record exists before the test
const existsBefore = await datastore.has(dsKey)
expect(existsBefore).to.be.true()

const msg: Message = {
type: T,
key,
closer: [],
providers: []
}

peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])

const response = await handler.handle(sourcePeer.peerId, msg)

expect(response).to.not.be.undefined()
expect(response.record).to.be.undefined()

// Verify the expired record was deleted from datastore
const existsAfter = await datastore.has(dsKey)
expect(existsAfter).to.be.false()
})
})
})
Loading