diff --git a/package.json b/package.json index 7991a73d..dee0f02e 100644 --- a/package.json +++ b/package.json @@ -183,6 +183,7 @@ "p-defer": "^4.0.0", "p-queue": "^7.3.4", "private-ip": "^3.0.0", + "progress-events": "^1.0.0", "protons-runtime": "^5.0.0", "uint8arraylist": "^2.0.0", "uint8arrays": "^4.0.2", diff --git a/src/content-fetching/index.ts b/src/content-fetching/index.ts index b6b317d7..67c86dbd 100644 --- a/src/content-fetching/index.ts +++ b/src/content-fetching/index.ts @@ -121,7 +121,7 @@ export class ContentFetching { } if (!sentCorrection) { - yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }) + yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options) } this.log.error('Failed error correcting entry') @@ -165,7 +165,7 @@ export class ContentFetching { } if (!(putEvent.record != null && uint8ArrayEquals(putEvent.record.value, Libp2pRecord.deserialize(record).value))) { - events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') })) + events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options)) } } @@ -240,7 +240,7 @@ export class ContentFetching { yield valueEvent({ value: localRec.value, from: this.components.peerId - }) + }, options) } catch (err: any) { this.log('error getting local value for %b', key, err) } @@ -252,7 +252,7 @@ export class ContentFetching { yield event if (event.name === 'PEER_RESPONSE' && (event.record != null)) { - yield valueEvent({ from: peer, value: event.record.value }) + yield valueEvent({ from: peer, value: event.record.value }, options) } } } diff --git a/src/content-routing/index.ts b/src/content-routing/index.ts index 2105ea3b..5e4a8e0d 100644 --- a/src/content-routing/index.ts +++ b/src/content-routing/index.ts @@ -17,7 +17,6 @@ import type { QueryManager } from '../query/manager.js' import type { QueryFunc } from '../query/types.js' import type { RoutingTable } from '../routing-table/index.js' import type { PeerInfo } from '@libp2p/interface-peer-info' -import type { AbortOptions } from '@libp2p/interfaces' import type { Logger } from '@libp2p/logger' import type { Multiaddr } from '@multiformats/multiaddr' import type { CID } from 'multiformats/cid' @@ -56,7 +55,7 @@ export class ContentRouting { * Announce to the network that we can provide the value for a given key and * are contactable on the given multiaddrs */ - async * provide (key: CID, multiaddrs: Multiaddr[], options: AbortOptions = {}): AsyncGenerator { + async * provide (key: CID, multiaddrs: Multiaddr[], options: QueryOptions = {}): AsyncGenerator { this.log('provide %s', key) // Add peer as provider @@ -94,7 +93,7 @@ export class ContentRouting { } } catch (err: any) { this.log.error('error sending provide record to peer %p', event.peer.id, err) - events.push(queryErrorEvent({ from: event.peer.id, error: err })) + events.push(queryErrorEvent({ from: event.peer.id, error: err }, options)) } return events @@ -153,8 +152,8 @@ export class ContentRouting { } } - yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers }) - yield providerEvent({ from: this.components.peerId, providers }) + yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers }, options) + yield providerEvent({ from: this.components.peerId, providers }, options) } // All done @@ -168,7 +167,10 @@ export class ContentRouting { const findProvidersQuery: QueryFunc = async function * ({ peer, signal }) { const request = new Message(MESSAGE_TYPE.GET_PROVIDERS, target, 0) - yield * self.network.sendRequest(peer, request, { signal }) + yield * self.network.sendRequest(peer, request, { + ...options, + signal + }) } const providers = new Set(provs.map(p => p.toString())) @@ -191,7 +193,7 @@ export class ContentRouting { } if (newProviders.length > 0) { - yield providerEvent({ from: event.from, providers: newProviders }) + yield providerEvent({ from: event.from, providers: newProviders }, options) } if (providers.size === toFind) { diff --git a/src/dual-kad-dht.ts b/src/dual-kad-dht.ts index 0210e0f5..48ee39b7 100644 --- a/src/dual-kad-dht.ts +++ b/src/dual-kad-dht.ts @@ -11,7 +11,6 @@ import { queryErrorEvent } from './query/events.js' import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' -import type { AbortOptions } from '@libp2p/interfaces' import type { CID } from 'multiformats/cid' const log = logger('libp2p:kad-dht') @@ -26,11 +25,11 @@ class DHTContentRouting implements ContentRouting { this.dht = dht } - async provide (cid: CID): Promise { - await drain(this.dht.provide(cid)) + async provide (cid: CID, options: QueryOptions = {}): Promise { + await drain(this.dht.provide(cid, options)) } - async * findProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator { + async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator { for await (const event of this.dht.findProviders(cid, options)) { if (event.name === 'PROVIDER') { yield * event.providers @@ -38,11 +37,11 @@ class DHTContentRouting implements ContentRouting { } } - async put (key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise { + async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise { await drain(this.dht.put(key, value, options)) } - async get (key: Uint8Array, options?: AbortOptions): Promise { + async get (key: Uint8Array, options?: QueryOptions): Promise { for await (const event of this.dht.get(key, options)) { if (event.name === 'VALUE') { return event.value @@ -63,7 +62,7 @@ class DHTPeerRouting implements PeerRouting { this.dht = dht } - async findPeer (peerId: PeerId, options: AbortOptions = {}): Promise { + async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise { for await (const event of this.dht.findPeer(peerId, options)) { if (event.name === 'FINAL_PEER') { return event.peer @@ -73,7 +72,7 @@ class DHTPeerRouting implements PeerRouting { throw new CodeError('Not found', 'ERR_NOT_FOUND') } - async * getClosestPeers (key: Uint8Array, options: AbortOptions = {}): AsyncIterable { + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable { for await (const event of this.dht.getClosestPeers(key, options)) { if (event.name === 'FINAL_PEER') { yield event.peer @@ -207,7 +206,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme )) { yield event - if (event.name === 'DIALING_PEER') { + if (event.name === 'DIAL_PEER') { queriedPeers = true } @@ -219,7 +218,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme } } - if (event.name === 'SENDING_QUERY') { + if (event.name === 'SEND_QUERY') { queriedPeers = true } } @@ -232,7 +231,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme yield queryErrorEvent({ from: this.components.peerId, error: new CodeError('Not found', 'ERR_NOT_FOUND') - }) + }, options) } } @@ -241,7 +240,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme /** * Announce to the network that we can provide given key's value */ - async * provide (key: CID, options: AbortOptions = {}): AsyncGenerator { + async * provide (key: CID, options: QueryOptions = {}): AsyncGenerator { let sent = 0 let success = 0 const errors = [] @@ -256,7 +255,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme for await (const event of merge(...dhts.map(dht => dht.provide(key, options)))) { yield event - if (event.name === 'SENDING_QUERY') { + if (event.name === 'SEND_QUERY') { sent++ } @@ -304,7 +303,7 @@ export class DefaultDualKadDHT extends EventEmitter impleme )) { yield event - if (event.name === 'SENDING_QUERY' || event.name === 'FINAL_PEER') { + if (event.name === 'SEND_QUERY' || event.name === 'FINAL_PEER') { queriedPeers = true } } diff --git a/src/index.ts b/src/index.ts index 5b1f64e3..a8df2341 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,19 +10,20 @@ import type { Registrar } from '@libp2p/interface-registrar' import type { AbortOptions } from '@libp2p/interfaces' import type { Datastore } from 'interface-datastore' import type { CID } from 'multiformats/cid' +import type { ProgressOptions, ProgressEvent } from 'progress-events' /** * The types of events emitted during DHT queries */ export enum EventTypes { - SENDING_QUERY = 0, + SEND_QUERY = 0, PEER_RESPONSE, FINAL_PEER, QUERY_ERROR, PROVIDER, VALUE, - ADDING_PEER, - DIALING_PEER + ADD_PEER, + DIAL_PEER } /** @@ -45,17 +46,27 @@ export interface DHTRecord { timeReceived?: Date } -export interface QueryOptions extends AbortOptions { +export type DHTProgressEvents = + ProgressEvent<'kad-dht:query:send-query', SendQueryEvent> | + ProgressEvent<'kad-dht:query:peer-response', PeerResponseEvent> | + ProgressEvent<'kad-dht:query:final-peer', FinalPeerEvent> | + ProgressEvent<'kad-dht:query:query-error', QueryErrorEvent> | + ProgressEvent<'kad-dht:query:provider', ProviderEvent> | + ProgressEvent<'kad-dht:query:value', ValueEvent> | + ProgressEvent<'kad-dht:query:add-peer', AddPeerEvent> | + ProgressEvent<'kad-dht:query:dial-peer', DialPeerEvent> + +export interface QueryOptions extends AbortOptions, ProgressOptions { queryFuncTimeout?: number } /** * Emitted when sending queries to remote peers */ -export interface SendingQueryEvent { +export interface SendQueryEvent { to: PeerId - type: EventTypes.SENDING_QUERY - name: 'SENDING_QUERY' + type: EventTypes.SEND_QUERY + name: 'SEND_QUERY' messageName: keyof typeof MessageType messageType: MessageType } @@ -118,22 +129,22 @@ export interface ValueEvent { /** * Emitted when peers are added to a query */ -export interface AddingPeerEvent { - type: EventTypes.ADDING_PEER - name: 'ADDING_PEER' +export interface AddPeerEvent { + type: EventTypes.ADD_PEER + name: 'ADD_PEER' peer: PeerId } /** * Emitted when peers are dialled as part of a query */ -export interface DialingPeerEvent { +export interface DialPeerEvent { peer: PeerId - type: EventTypes.DIALING_PEER - name: 'DIALING_PEER' + type: EventTypes.DIAL_PEER + name: 'DIAL_PEER' } -export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent +export type QueryEvent = SendQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddPeerEvent | DialPeerEvent export interface RoutingTable { size: number diff --git a/src/network.ts b/src/network.ts index fabf574c..ca525af2 100644 --- a/src/network.ts +++ b/src/network.ts @@ -8,12 +8,12 @@ import * as lp from 'it-length-prefixed' import { pipe } from 'it-pipe' import { Message } from './message/index.js' import { - dialingPeerEvent, - sendingQueryEvent, + dialPeerEvent, + sendQueryEvent, peerResponseEvent, queryErrorEvent } from './query/events.js' -import type { KadDHTComponents, QueryEvent } from './index.js' +import type { KadDHTComponents, QueryEvent, QueryOptions } from './index.js' import type { Stream } from '@libp2p/interface-connection' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' @@ -82,14 +82,14 @@ export class Network extends EventEmitter implements Startable { /** * Send a request and record RTT for latency measurements */ - async * sendRequest (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator { + async * sendRequest (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator { if (!this.running) { return } this.log('sending %s to %p', msg.type, to) - yield dialingPeerEvent({ peer: to }) - yield sendingQueryEvent({ to, type: msg.type }) + yield dialPeerEvent({ peer: to }, options) + yield sendQueryEvent({ to, type: msg.type }, options) let stream: Stream | undefined @@ -105,9 +105,9 @@ export class Network extends EventEmitter implements Startable { closer: response.closerPeers, providers: response.providerPeers, record: response.record - }) + }, options) } catch (err: any) { - yield queryErrorEvent({ from: to, error: err }) + yield queryErrorEvent({ from: to, error: err }, options) } finally { if (stream != null) { stream.close() @@ -118,14 +118,14 @@ export class Network extends EventEmitter implements Startable { /** * Sends a message without expecting an answer */ - async * sendMessage (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator { + async * sendMessage (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator { if (!this.running) { return } this.log('sending %s to %p', msg.type, to) - yield dialingPeerEvent({ peer: to }) - yield sendingQueryEvent({ to, type: msg.type }) + yield dialPeerEvent({ peer: to }, options) + yield sendQueryEvent({ to, type: msg.type }, options) let stream: Stream | undefined @@ -135,9 +135,9 @@ export class Network extends EventEmitter implements Startable { await this._writeMessage(stream, msg.serialize(), options) - yield peerResponseEvent({ from: to, messageType: msg.type }) + yield peerResponseEvent({ from: to, messageType: msg.type }, options) } catch (err: any) { - yield queryErrorEvent({ from: to, error: err }) + yield queryErrorEvent({ from: to, error: err }, options) } finally { if (stream != null) { stream.close() diff --git a/src/peer-routing/index.ts b/src/peer-routing/index.ts index 49ba6566..ab0af91c 100644 --- a/src/peer-routing/index.ts +++ b/src/peer-routing/index.ts @@ -13,7 +13,7 @@ import { valueEvent } from '../query/events.js' import * as utils from '../utils.js' -import type { KadDHTComponents, DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, Validators } from '../index.js' +import type { KadDHTComponents, DHTRecord, DialPeerEvent, FinalPeerEvent, QueryEvent, Validators } from '../index.js' import type { Network } from '../network.js' import type { QueryManager, QueryOptions } from '../query/manager.js' import type { QueryFunc } from '../query/types.js' @@ -122,7 +122,7 @@ export class PeerRouting { throw new CodeError('public key missing', 'ERR_PUBLIC_KEY_MISSING') } - yield valueEvent({ from: peer, value: recPeer.publicKey }) + yield valueEvent({ from: peer, value: recPeer.publicKey }, options) } } @@ -144,7 +144,7 @@ export class PeerRouting { yield finalPeerEvent({ from: this.components.peerId, peer: pi - }) + }, options) return } @@ -153,7 +153,10 @@ export class PeerRouting { const findPeerQuery: QueryFunc = async function * ({ peer, signal }) { const request = new Message(MESSAGE_TYPE.FIND_NODE, id.toBytes(), 0) - for await (const event of self.network.sendRequest(peer, request, { signal })) { + for await (const event of self.network.sendRequest(peer, request, { + ...options, + signal + })) { yield event if (event.name === 'PEER_RESPONSE') { @@ -161,7 +164,7 @@ export class PeerRouting { // found the peer if (match != null) { - yield finalPeerEvent({ from: event.from, peer: match }) + yield finalPeerEvent({ from: event.from, peer: match }, options) } } } @@ -178,7 +181,7 @@ export class PeerRouting { } if (!foundPeer) { - yield queryErrorEvent({ from: this.components.peerId, error: new CodeError('Not found', 'ERR_NOT_FOUND') }) + yield queryErrorEvent({ from: this.components.peerId, error: new CodeError('Not found', 'ERR_NOT_FOUND') }, options) } } @@ -186,7 +189,7 @@ export class PeerRouting { * Kademlia 'node lookup' operation on a key, which could be a the * bytes from a multihash or a peer ID */ - async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { this.log('getClosestPeers to %b', key) const id = await utils.convertBuffer(key) const tablePeers = this.routingTable.closestPeers(id) @@ -199,7 +202,10 @@ export class PeerRouting { self.log('closerPeersSingle %s from %p', uint8ArrayToString(key, 'base32'), peer) const request = new Message(MESSAGE_TYPE.FIND_NODE, key, 0) - yield * self.network.sendRequest(peer, request, { signal }) + yield * self.network.sendRequest(peer, request, { + ...options, + signal + }) } for await (const event of this.queryManager.run(key, getCloserPeersQuery, options)) { @@ -223,7 +229,7 @@ export class PeerRouting { multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr), protocols: peer.protocols } - }) + }, options) } catch (err: any) { if (err.code !== 'ERR_NOT_FOUND') { throw err @@ -238,7 +244,7 @@ export class PeerRouting { * * Note: The peerStore is updated with new addresses found for the given peer. */ - async * getValueOrPeers (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { + async * getValueOrPeers (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { for await (const event of this._getValueSingle(peer, key, options)) { if (event.name === 'PEER_RESPONSE') { if (event.record != null) { @@ -249,7 +255,7 @@ export class PeerRouting { const errMsg = 'invalid record received, discarded' this.log(errMsg) - yield queryErrorEvent({ from: event.from, error: new CodeError(errMsg, 'ERR_INVALID_RECORD') }) + yield queryErrorEvent({ from: event.from, error: new CodeError(errMsg, 'ERR_INVALID_RECORD') }, options) continue } } diff --git a/src/query/events.ts b/src/query/events.ts index fef9ff7f..d838e2b4 100644 --- a/src/query/events.ts +++ b/src/query/events.ts @@ -1,5 +1,6 @@ +import { CustomEvent } from '@libp2p/interfaces/events' import { MESSAGE_TYPE_LOOKUP } from '../message/index.js' -import type { SendingQueryEvent, PeerResponseEvent, DialingPeerEvent, AddingPeerEvent, ValueEvent, ProviderEvent, QueryErrorEvent, FinalPeerEvent } from '../index.js' +import type { SendQueryEvent, PeerResponseEvent, DialPeerEvent, AddPeerEvent, ValueEvent, ProviderEvent, QueryErrorEvent, FinalPeerEvent, QueryOptions } from '../index.js' import type { Message } from '../message/dht.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' @@ -10,14 +11,18 @@ export interface QueryEventFields { type: Message.MessageType } -export function sendingQueryEvent (fields: QueryEventFields): SendingQueryEvent { - return { +export function sendQueryEvent (fields: QueryEventFields, options: QueryOptions = {}): SendQueryEvent { + const event: SendQueryEvent = { ...fields, - name: 'SENDING_QUERY', + name: 'SEND_QUERY', type: 0, messageName: fields.type, messageType: MESSAGE_TYPE_LOOKUP.indexOf(fields.type.toString()) } + + options.onProgress?.(new CustomEvent('kad-dht:query:send-query', { detail: event })) + + return event } export interface PeerResponseEventField { @@ -28,8 +33,8 @@ export interface PeerResponseEventField { record?: Libp2pRecord } -export function peerResponseEvent (fields: PeerResponseEventField): PeerResponseEvent { - return { +export function peerResponseEvent (fields: PeerResponseEventField, options: QueryOptions = {}): PeerResponseEvent { + const event: PeerResponseEvent = { ...fields, name: 'PEER_RESPONSE', type: 1, @@ -37,6 +42,10 @@ export function peerResponseEvent (fields: PeerResponseEventField): PeerResponse closer: (fields.closer != null) ? fields.closer : [], providers: (fields.providers != null) ? fields.providers : [] } + + options.onProgress?.(new CustomEvent('kad-dht:query:peer-response', { detail: event })) + + return event } export interface FinalPeerEventFields { @@ -44,12 +53,16 @@ export interface FinalPeerEventFields { peer: PeerInfo } -export function finalPeerEvent (fields: FinalPeerEventFields): FinalPeerEvent { - return { +export function finalPeerEvent (fields: FinalPeerEventFields, options: QueryOptions = {}): FinalPeerEvent { + const event: FinalPeerEvent = { ...fields, name: 'FINAL_PEER', type: 2 } + + options.onProgress?.(new CustomEvent('kad-dht:query:final-peer', { detail: event })) + + return event } export interface ErrorEventFields { @@ -57,12 +70,16 @@ export interface ErrorEventFields { error: Error } -export function queryErrorEvent (fields: ErrorEventFields): QueryErrorEvent { - return { +export function queryErrorEvent (fields: ErrorEventFields, options: QueryOptions = {}): QueryErrorEvent { + const event: QueryErrorEvent = { ...fields, name: 'QUERY_ERROR', type: 3 } + + options.onProgress?.(new CustomEvent('kad-dht:query:query-error', { detail: event })) + + return event } export interface ProviderEventFields { @@ -70,12 +87,16 @@ export interface ProviderEventFields { providers: PeerInfo[] } -export function providerEvent (fields: ProviderEventFields): ProviderEvent { - return { +export function providerEvent (fields: ProviderEventFields, options: QueryOptions = {}): ProviderEvent { + const event: ProviderEvent = { ...fields, name: 'PROVIDER', type: 4 } + + options.onProgress?.(new CustomEvent('kad-dht:query:provider', { detail: event })) + + return event } export interface ValueEventFields { @@ -83,34 +104,46 @@ export interface ValueEventFields { value: Uint8Array } -export function valueEvent (fields: ValueEventFields): ValueEvent { - return { +export function valueEvent (fields: ValueEventFields, options: QueryOptions = {}): ValueEvent { + const event: ValueEvent = { ...fields, name: 'VALUE', type: 5 } + + options.onProgress?.(new CustomEvent('kad-dht:query:value', { detail: event })) + + return event } export interface PeerEventFields { peer: PeerId } -export function addingPeerEvent (fields: PeerEventFields): AddingPeerEvent { - return { +export function addPeerEvent (fields: PeerEventFields, options: QueryOptions = {}): AddPeerEvent { + const event: AddPeerEvent = { ...fields, - name: 'ADDING_PEER', + name: 'ADD_PEER', type: 6 } + + options.onProgress?.(new CustomEvent('kad-dht:query:add-peer', { detail: event })) + + return event } -export interface DialingPeerEventFields { +export interface DialPeerEventFields { peer: PeerId } -export function dialingPeerEvent (fields: DialingPeerEventFields): DialingPeerEvent { - return { +export function dialPeerEvent (fields: DialPeerEventFields, options: QueryOptions = {}): DialPeerEvent { + const event: DialPeerEvent = { ...fields, - name: 'DIALING_PEER', + name: 'DIAL_PEER', type: 7 } + + options.onProgress?.(new CustomEvent('kad-dht:query:dial-peer', { detail: event })) + + return event } diff --git a/src/query/manager.ts b/src/query/manager.ts index cef3e10b..9eb2fe02 100644 --- a/src/query/manager.ts +++ b/src/query/manager.ts @@ -12,11 +12,10 @@ import { import { convertBuffer } from '../utils.js' import { queryPath } from './query-path.js' import type { QueryFunc } from './types.js' -import type { QueryEvent } from '../index.js' +import type { QueryEvent, QueryOptions as RootQueryOptions } from '../index.js' import type { RoutingTable } from '../routing-table/index.js' import type { Metric, Metrics } from '@libp2p/interface-metrics' import type { PeerId } from '@libp2p/interface-peer-id' -import type { AbortOptions } from '@libp2p/interfaces' import type { Startable } from '@libp2p/interfaces/startable' import type { DeferredPromise } from 'p-defer' @@ -37,7 +36,7 @@ export interface QueryManagerComponents { metrics?: Metrics } -export interface QueryOptions extends AbortOptions { +export interface QueryOptions extends RootQueryOptions { queryFuncTimeout?: number isSelfQuery?: boolean } @@ -192,7 +191,8 @@ export class QueryManager implements Startable { cleanUp, queryFuncTimeout: options.queryFuncTimeout, log, - peersSeen + peersSeen, + onProgress: options.onProgress }) }) diff --git a/src/query/query-path.ts b/src/query/query-path.ts index f12219e4..2966b294 100644 --- a/src/query/query-path.ts +++ b/src/query/query-path.ts @@ -7,7 +7,7 @@ import { xor } from 'uint8arrays/xor' import { convertPeerId, convertBuffer } from '../utils.js' import { queryErrorEvent } from './events.js' import type { CleanUpEvents } from './manager.js' -import type { QueryEvent } from '../index.js' +import type { QueryEvent, QueryOptions } from '../index.js' import type { QueryFunc } from '../query/types.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { EventEmitter } from '@libp2p/interfaces/events' @@ -16,7 +16,7 @@ import type { PeerSet } from '@libp2p/peer-collections' const MAX_XOR = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF') -export interface QueryPathOptions { +export interface QueryPathOptions extends QueryOptions { /** * What are we trying to find */ @@ -160,7 +160,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { expect(res).to.have.property('value').that.equalBytes(value) }) + it('put - get calls progress handler', async function () { + this.timeout(10 * 1000) + + const key = uint8ArrayFromString('/v/hello') + const value = uint8ArrayFromString('world') + + const [dhtA, dhtB] = await Promise.all([ + tdht.spawn(), + tdht.spawn() + ]) + + // Connect nodes + await tdht.connect(dhtA, dhtB) + + const putProgress = sinon.stub() + + // Exchange data through the dht + await drain(dhtA.put(key, value, { + onProgress: putProgress + })) + + expect(putProgress).to.have.property('called', true) + + const getProgress = sinon.stub() + + await drain(dhtB.get(key, { + onProgress: getProgress + })) + + expect(getProgress).to.have.property('called', true) + }) + it('put - should require a minimum number of peers to have successful puts', async function () { this.timeout(10 * 1000)