Skip to content

Commit a3260ff

Browse files
authored
feat: BlockBroker factory support (#284)
1 parent e91f917 commit a3260ff

File tree

17 files changed

+438
-36
lines changed

17 files changed

+438
-36
lines changed

packages/helia/.aegir.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ const options = {
1111
before: async () => {
1212
// use dynamic import otherwise the source may not have been built yet
1313
const { createHelia } = await import('./dist/src/index.js')
14+
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')
1415

1516
const helia = await createHelia({
17+
blockBrokers: [
18+
BitswapBlockBrokerFactory
19+
],
1620
libp2p: {
1721
addresses: {
1822
listen: [

packages/helia/src/block-brokers/bitswap-block-broker.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { createBitswap } from 'ipfs-bitswap'
2-
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
2+
import type { BlockBrokerFactoryFunction } from '@helia/interface'
3+
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
34
import type { Libp2p } from '@libp2p/interface'
45
import type { Startable } from '@libp2p/interface/startable'
56
import type { Blockstore } from 'interface-blockstore'
6-
import type { AbortOptions } from 'interface-store'
77
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
88
import type { CID } from 'multiformats/cid'
99
import type { MultihashHasher } from 'multiformats/hashes/interface'
@@ -52,7 +52,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
5252
this.bitswap.notify(cid, block, options)
5353
}
5454

55-
async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
55+
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
5656
return this.bitswap.want(cid, options)
5757
}
5858
}
59+
60+
/**
61+
* A helper factory for users who want to override Helia `blockBrokers` but
62+
* still want to use the default `BitswapBlockBroker`.
63+
*/
64+
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
65+
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
66+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export { BitswapBlockBroker } from './bitswap-block-broker.js'
2-
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
1+
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
2+
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'

packages/helia/src/block-brokers/trustless-gateway-block-broker.ts

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { logger } from '@libp2p/logger'
2-
import type { BlockRetriever } from '@helia/interface/blocks'
3-
import type { AbortOptions } from 'interface-store'
2+
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
43
import type { CID } from 'multiformats/cid'
54
import type { ProgressEvent, ProgressOptions } from 'progress-events'
65

@@ -10,9 +9,9 @@ const log = logger('helia:trustless-gateway-block-broker')
109
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
1110
* successes for a given gateway url so that we can prioritize gateways that
1211
* have been more reliable in the past, and ensure that requests are distributed
13-
* across all gateways within a given `TrustedGatewayBlockBroker` instance.
12+
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
1413
*/
15-
class TrustlessGateway {
14+
export class TrustlessGateway {
1615
public readonly url: URL
1716
/**
1817
* The number of times this gateway has been attempted to be used to fetch a
@@ -30,6 +29,13 @@ class TrustlessGateway {
3029
*/
3130
#errors = 0
3231

32+
/**
33+
* The number of times this gateway has returned an invalid block. A gateway
34+
* that returns the wrong blocks for a CID should be considered for removal
35+
* from the list of gateways to fetch blocks from.
36+
*/
37+
#invalidBlocks = 0
38+
3339
/**
3440
* The number of times this gateway has successfully fetched a block.
3541
*/
@@ -91,7 +97,7 @@ class TrustlessGateway {
9197
* Unused gateways have 100% reliability; They will be prioritized over
9298
* gateways with a 100% success rate to ensure that we attempt all gateways.
9399
*/
94-
get reliability (): number {
100+
reliability (): number {
95101
/**
96102
* if we have never tried to use this gateway, it is considered the most
97103
* reliable until we determine otherwise (prioritize unused gateways)
@@ -100,6 +106,11 @@ class TrustlessGateway {
100106
return 1
101107
}
102108

109+
if (this.#invalidBlocks > 0) {
110+
// this gateway may not be trustworthy..
111+
return -Infinity
112+
}
113+
103114
/**
104115
* We have attempted the gateway, so we need to calculate the reliability
105116
* based on the number of attempts, errors, and successes. Gateways that
@@ -110,6 +121,13 @@ class TrustlessGateway {
110121
*/
111122
return this.#successes / (this.#attempts + (this.#errors * 3))
112123
}
124+
125+
/**
126+
* Increment the number of invalid blocks returned by this gateway.
127+
*/
128+
incrementInvalidBlocks (): void {
129+
this.#invalidBlocks++
130+
}
113131
}
114132

115133
export type TrustlessGatewayGetBlockProgressEvents =
@@ -119,24 +137,39 @@ export type TrustlessGatewayGetBlockProgressEvents =
119137
* A class that accepts a list of trustless gateways that are queried
120138
* for blocks.
121139
*/
122-
export class TrustedGatewayBlockBroker implements BlockRetriever<
140+
export class TrustlessGatewayBlockBroker implements BlockRetriever<
123141
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
124142
> {
125143
private readonly gateways: TrustlessGateway[]
126144

127-
constructor (urls: Array<string | URL>) {
128-
this.gateways = urls.map((url) => new TrustlessGateway(url))
145+
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
146+
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
147+
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
148+
return gatewayOrUrl as TrustlessGateway
149+
}
150+
// eslint-disable-next-line no-console
151+
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
152+
return new TrustlessGateway(gatewayOrUrl)
153+
})
129154
}
130155

131-
async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
156+
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
132157
// Loop through the gateways until we get a block or run out of gateways
133-
const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability)
158+
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
134159
const aggregateErrors: Error[] = []
135160
for (const gateway of sortedGateways) {
136161
log('getting block for %c from %s', cid, gateway.url)
137162
try {
138163
const block = await gateway.getRawBlock(cid, options.signal)
139164
log.trace('got block for %c from %s', cid, gateway.url)
165+
try {
166+
await options.validateFn?.(block)
167+
} catch (err) {
168+
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
169+
gateway.incrementInvalidBlocks()
170+
171+
throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
172+
}
140173

141174
return block
142175
} catch (err: unknown) {

packages/helia/src/helia.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type BlockBroker } from '@helia/interface/blocks'
12
import { start, stop } from '@libp2p/interface/startable'
23
import { logger } from '@libp2p/logger'
34
import drain from 'it-drain'
@@ -19,6 +20,7 @@ const log = logger('helia')
1920
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
2021
libp2p: T
2122
blockstore: Blockstore
23+
blockBrokers: BlockBroker[]
2224
datastore: Datastore
2325
}
2426

packages/helia/src/index.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import { logger } from '@libp2p/logger'
2525
import { MemoryBlockstore } from 'blockstore-core'
2626
import { MemoryDatastore } from 'datastore-core'
27-
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
27+
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
2828
import { HeliaImpl } from './helia.js'
2929
import { defaultHashers } from './utils/default-hashers.js'
3030
import { createLibp2p } from './utils/libp2p.js'
3131
import { name, version } from './version.js'
3232
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
33-
import type { Helia } from '@helia/interface'
33+
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
3434
import type { BlockBroker } from '@helia/interface/blocks'
3535
import type { Libp2p } from '@libp2p/interface'
3636
import type { Blockstore } from 'interface-blockstore'
@@ -98,7 +98,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
9898
* A list of strategies used to fetch blocks when they are not present in
9999
* the local blockstore
100100
*/
101-
blockBrokers?: BlockBroker[]
101+
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>
102102

103103
/**
104104
* Pass `false` to not start the Helia node
@@ -159,9 +159,19 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
159159

160160
const hashers = defaultHashers(init.hashers)
161161

162-
const blockBrokers = init.blockBrokers ?? [
162+
const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
163+
if (typeof blockBroker !== 'function') {
164+
return blockBroker satisfies BlockBroker
165+
}
166+
return blockBroker({
167+
blockstore,
168+
datastore,
169+
libp2p,
170+
hashers
171+
}) satisfies BlockBroker
172+
}) ?? [
163173
new BitswapBlockBroker(libp2p, blockstore, hashers),
164-
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
174+
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
165175
]
166176

167177
const helia = new HeliaImpl({

packages/helia/src/utils/networked-storage.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import filter from 'it-filter'
66
import forEach from 'it-foreach'
77
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
88
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
9-
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks'
9+
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks'
1010
import type { AbortOptions } from '@libp2p/interface'
1111
import type { Blockstore } from 'interface-blockstore'
1212
import type { AwaitIterable } from 'interface-store'
@@ -196,39 +196,57 @@ export class NetworkedStorage implements Blocks, Startable {
196196
}
197197
}
198198

199-
/**
200-
* Race block providers cancelling any pending requests once the block has been
201-
* found.
202-
*/
203-
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
199+
export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]): Required<BlockRetrievalOptions>['validateFn'] => {
204200
const hasher = hashers.find(hasher => hasher.code === cid.multihash.code)
205201

206202
if (hasher == null) {
207203
throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG')
208204
}
209205

206+
return async (block: Uint8Array): Promise<void> => {
207+
// verify block
208+
const hash = await hasher.digest(block)
209+
210+
if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
211+
// if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway
212+
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
213+
}
214+
}
215+
}
216+
217+
/**
218+
* Race block providers cancelling any pending requests once the block has been
219+
* found.
220+
*/
221+
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
222+
const validateFn = getCidBlockVerifierFunction(cid, hashers)
223+
210224
const controller = new AbortController()
211225
const signal = anySignal([controller.signal, options.signal])
212226

213227
try {
214228
return await Promise.any(
215229
providers.map(async provider => {
216230
try {
231+
let blocksWereValidated = false
217232
const block = await provider.retrieve(cid, {
218233
...options,
219-
signal
234+
signal,
235+
validateFn: async (block: Uint8Array): Promise<void> => {
236+
await validateFn(block)
237+
blocksWereValidated = true
238+
}
220239
})
221240

222-
// verify block
223-
const hash = await hasher.digest(block)
224-
225-
if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) {
226-
throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH')
241+
if (!blocksWereValidated) {
242+
// the blockBroker either did not throw an error when attempting to validate the block
243+
// or did not call the validateFn at all. We should validate the block ourselves
244+
await validateFn(block)
227245
}
228246

229247
return block
230248
} catch (err) {
231-
log.error('could not retrieve block for %c', cid, err)
249+
log.error('could not retrieve verified block for %c', cid, err)
232250
throw err
233251
}
234252
})

0 commit comments

Comments
 (0)