Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import { Stats } from './stats.js'
import { WantList } from './want-list.js'
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, WantListEntry, BitswapComponents } from './index.js'
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { ComponentLogger, PeerId } from '@libp2p/interface'
import type { BlockBroker, CreateSessionOptions, ProviderOptions } from '@helia/interface'
import type { ComponentLogger, Libp2p, PeerId } from '@libp2p/interface'
import type { Logger } from '@libp2p/logger'
import type { AbortOptions } from '@multiformats/multiaddr'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

export interface WantOptions extends AbortOptions, ProgressOptions<BitswapWantProgressEvents> {
export interface WantOptions extends AbortOptions, ProgressOptions<BitswapWantProgressEvents>, ProviderOptions {
/**
* When searching the routing for providers, stop searching after finding this
* many providers.
Expand All @@ -37,11 +37,13 @@
public blockstore: Blockstore
public peerWantLists: PeerWantLists
public wantList: WantList
public libp2p: Libp2p

constructor (components: BitswapComponents, init: BitswapOptions = {}) {
this.logger = components.logger
this.log = components.logger.forComponent('helia:bitswap')
this.blockstore = components.blockstore
this.libp2p = components.libp2p

// report stats to libp2p metrics
this.stats = new Stats(components)
Expand All @@ -66,7 +68,8 @@
return createBitswapSession({
wantList: this.wantList,
network: this.network,
logger: this.logger
logger: this.logger,
libp2p: this.libp2p

Check warning on line 72 in packages/bitswap/src/bitswap.ts

View check run for this annotation

Codecov / codecov/patch

packages/bitswap/src/bitswap.ts#L71-L72

Added lines #L71 - L72 were not covered by tests
}, options)
}

Expand Down
4 changes: 2 additions & 2 deletions packages/bitswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { Bitswap as BitswapClass } from './bitswap.js'
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents, BitswapNetworkProgressEvents } from './network.js'
import type { WantType } from './pb/message.js'
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { BlockBroker, CreateSessionOptions, ProviderOptions } from '@helia/interface'
import type { Routing } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -59,7 +59,7 @@ export interface Bitswap extends Startable {
/**
* Start a session to retrieve a file from the network
*/
want(cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<Uint8Array>
want(cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents> & ProviderOptions): Promise<Uint8Array>

/**
* Start a session to retrieve a file from the network
Expand Down
18 changes: 15 additions & 3 deletions packages/bitswap/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import type { Provider, Routing } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, ComponentLogger, IdentifyResult, Counter, Metrics } from '@libp2p/interface'
import type { Logger } from '@libp2p/logger'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type BitswapNetworkProgressEvents =
ProgressEvent<'bitswap:network:dial', PeerId>
ProgressEvent<'bitswap:network:dial', PeerId | Multiaddr | Multiaddr[]>

export type BitswapNetworkWantProgressEvents =
ProgressEvent<'bitswap:network:send-wantlist', PeerId> |
Expand Down Expand Up @@ -262,6 +263,17 @@
* Find the providers of a given `cid` and connect to them.
*/
async findAndConnect (cid: CID, options?: WantOptions): Promise<void> {
// connect to initial session providers if supplied
if (options?.providers != null) {
await Promise.all(
options.providers.map(async prov => this.connectTo(prov)
.catch(err => {
this.log.error('could not connect to supplied provider - %e', err)
}))
)
}

Check warning on line 274 in packages/bitswap/src/network.ts

View check run for this annotation

Codecov / codecov/patch

packages/bitswap/src/network.ts#L268-L274

Added lines #L268 - L274 were not covered by tests

// make a routing query to find additional providers
await drain(
map(
take(this.findProviders(cid, options), options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST),
Expand Down Expand Up @@ -335,12 +347,12 @@
/**
* Connects to another peer
*/
async connectTo (peer: PeerId, options?: AbortOptions & ProgressOptions<BitswapNetworkProgressEvents>): Promise<Connection> { // eslint-disable-line require-await
async connectTo (peer: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions & ProgressOptions<BitswapNetworkProgressEvents>): Promise<Connection> { // eslint-disable-line require-await
if (!this.running) {
throw new NotStartedError('Network isn\'t running')
}

options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:dial', peer))
options?.onProgress?.(new CustomProgressEvent<PeerId | Multiaddr | Multiaddr[]>('bitswap:network:dial', peer))

// dial and wait for identify - this is to avoid opening a protocol stream
// that we are not going to use but depends on the remote node running the
Expand Down
17 changes: 16 additions & 1 deletion packages/bitswap/src/session.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import { AbstractSession } from '@helia/utils'
import { isPeerId } from '@libp2p/interface'
import type { BitswapWantProgressEvents } from './index.js'
import type { Network } from './network.js'
import type { WantList } from './want-list.js'
import type { CreateSessionOptions } from '@helia/interface'
import type { ComponentLogger, PeerId } from '@libp2p/interface'
import type { ComponentLogger, Libp2p, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'

export interface BitswapSessionComponents {
network: Network
wantList: WantList
logger: ComponentLogger
libp2p: Libp2p
}

class BitswapSession extends AbstractSession<PeerId, BitswapWantProgressEvents> {
private readonly wantList: WantList
private readonly network: Network
private readonly libp2p: Libp2p

constructor (components: BitswapSessionComponents, init: CreateSessionOptions) {
super(components, {
Expand All @@ -25,6 +29,7 @@

this.wantList = components.wantList
this.network = components.network
this.libp2p = components.libp2p
}

async queryProvider (cid: CID, provider: PeerId, options: AbortOptions): Promise<Uint8Array> {
Expand Down Expand Up @@ -54,6 +59,16 @@
equals (providerA: PeerId, providerB: PeerId): boolean {
return providerA.equals(providerB)
}

async convertToProvider (provider: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions): Promise<PeerId | undefined> {
if (isPeerId(provider)) {
return provider
}

const connection = await this.libp2p.dial(provider, options)

return connection.remotePeer
}

Check warning on line 71 in packages/bitswap/src/session.ts

View check run for this annotation

Codecov / codecov/patch

packages/bitswap/src/session.ts#L64-L71

Added lines #L64 - L71 were not covered by tests
}

export function createBitswapSession (components: BitswapSessionComponents, init: CreateSessionOptions): BitswapSession {
Expand Down
6 changes: 4 additions & 2 deletions packages/bitswap/test/session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { createBitswapSession } from '../src/session.js'
import type { Network } from '../src/network.js'
import type { WantList } from '../src/want-list.js'
import type { ComponentLogger } from '@libp2p/interface'
import type { ComponentLogger, Libp2p } from '@libp2p/interface'

interface StubbedBitswapSessionComponents {
network: StubbedInstance<Network>
wantList: StubbedInstance<WantList>
logger: ComponentLogger
libp2p: StubbedInstance<Libp2p>
}

describe('session', () => {
Expand All @@ -35,7 +36,8 @@ describe('session', () => {
wantList: stubInterface<WantList>({
peers: new PeerMap()
}),
logger: defaultLogger()
logger: defaultLogger(),
libp2p: stubInterface<Libp2p>()
}
})

Expand Down
33 changes: 30 additions & 3 deletions packages/block-brokers/src/trustless-gateway/session.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { AbstractSession } from '@helia/utils'
import { findHttpGatewayProviders } from './utils.js'
import { isPeerId } from '@libp2p/interface'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { TrustlessGateway } from './trustless-gateway.js'
import { filterNonHTTPMultiaddrs, findHttpGatewayProviders } from './utils.js'
import { DEFAULT_ALLOW_INSECURE, DEFAULT_ALLOW_LOCAL } from './index.js'
import type { CreateTrustlessGatewaySessionOptions } from './broker.js'
import type { TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { TransformRequestInit, TrustlessGateway } from './trustless-gateway.js'
import type { TransformRequestInit } from './trustless-gateway.js'
import type { BlockRetrievalOptions, Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ComponentLogger, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'

Expand Down Expand Up @@ -54,6 +58,29 @@
equals (providerA: TrustlessGateway, providerB: TrustlessGateway): boolean {
return providerA.url.toString() === providerB.url.toString()
}

async convertToProvider (provider: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions): Promise<TrustlessGateway | undefined> {
if (isPeerId(provider)) {
return
}

const httpAddresses = filterNonHTTPMultiaddrs(Array.isArray(provider) ? provider : [provider], this.allowInsecure, this.allowLocal)

if (httpAddresses.length === 0) {
return
}

// take first address?
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
const uri = multiaddrToUri(httpAddresses[0])

return new TrustlessGateway(uri, {
logger: this.logger,
transformRequestInit: this.transformRequestInit
})
}

Check warning on line 83 in packages/block-brokers/src/trustless-gateway/session.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/session.ts#L63-L83

Added lines #L63 - L83 were not covered by tests
}

export function createTrustlessGatewaySession (components: TrustlessGatewaySessionComponents, init: CreateTrustlessGatewaySessionOptions): TrustlessGatewaySession {
Expand Down
1 change: 1 addition & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"dependencies": {
"@libp2p/interface": "^2.2.1",
"@multiformats/dns": "^1.0.6",
"@multiformats/multiaddr": "^12.4.0",
"interface-blockstore": "^5.3.1",
"interface-datastore": "^8.3.1",
"interface-store": "^6.0.2",
Expand Down
32 changes: 27 additions & 5 deletions packages/interface/src/blocks.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
Expand All @@ -8,6 +10,18 @@ export interface Pair {
block: Uint8Array
}

export interface ProviderOptions {
/**
* An optional list of peers known to host at least the root block of the DAG
* that will be fetched.
*
* If this list is omitted, or if the peers cannot supply the root or any
* child blocks, a `findProviders` routing query will be run to find peers
* that can supply the blocks.
*/
providers?: Array<PeerId | Multiaddr | Multiaddr[]>
}

export type HasBlockProgressEvents =
ProgressEvent<'blocks:put:duplicate', CID> |
ProgressEvent<'blocks:put:providers:notify', CID> |
Expand Down Expand Up @@ -53,11 +67,19 @@ export interface GetOfflineOptions {

export interface Blocks extends Blockstore<ProgressOptions<HasBlockProgressEvents>,
ProgressOptions<PutBlockProgressEvents>, ProgressOptions<PutManyBlocksProgressEvents>,
GetOfflineOptions & ProgressOptions<GetBlockProgressEvents>, GetOfflineOptions & ProgressOptions<GetManyBlocksProgressEvents>, ProgressOptions<GetAllBlocksProgressEvents>,
GetOfflineOptions & ProviderOptions & ProgressOptions<GetBlockProgressEvents>,
GetOfflineOptions & ProviderOptions & ProgressOptions<GetManyBlocksProgressEvents>,
ProgressOptions<GetAllBlocksProgressEvents>,
ProgressOptions<DeleteBlockProgressEvents>, ProgressOptions<DeleteManyBlocksProgressEvents>
> {

createSession(root: CID, options?: CreateSessionOptions<GetBlockProgressEvents>): SessionBlockstore
/**
* A blockstore session only fetches blocks from a subset of network peers to
* reduce network traffic and improve performance.
*
* The initial set of peers can be specified, alternatively a `findProviders`
* routing query will occur to populate the set instead.
*/
createSession(root: CID, options?: CreateSessionOptions<GetOfflineOptions & ProviderOptions & GetBlockProgressEvents>): SessionBlockstore
}

/**
Expand All @@ -80,7 +102,7 @@ ProgressOptions<DeleteBlockProgressEvents>, ProgressOptions<DeleteManyBlocksProg
close(): void
}

export interface BlockRetrievalOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {
export interface BlockRetrievalOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents>, ProviderOptions {
/**
* A function that blockBrokers should call prior to returning a block to ensure it can maintain control
* of the block request flow. e.g. TrustedGatewayBlockBroker will use this to ensure that the block
Expand All @@ -94,7 +116,7 @@ export interface BlockAnnounceOptions <ProgressEvents extends ProgressEvent<any,

}

export interface CreateSessionOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {
export interface CreateSessionOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents>, ProviderOptions {
/**
* The minimum number of providers for the root CID that are required for
* successful session creation.
Expand Down
64 changes: 64 additions & 0 deletions packages/interop/src/helia-blockstore-sessions.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* eslint-env mocha */

import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import { createHeliaNode } from './fixtures/create-helia.js'
import { createKuboNode } from './fixtures/create-kubo.js'
import type { HeliaLibp2p } from 'helia'
import type { KuboInfo, KuboNode } from 'ipfsd-ctl'

describe('helia - blockstore sessions', () => {
let helia: HeliaLibp2p
let kubo: KuboNode
let kuboInfo: KuboInfo

beforeEach(async () => {
helia = await createHeliaNode()
kubo = await createKuboNode()
kuboInfo = await kubo.info()
})

afterEach(async () => {
if (helia != null) {
await helia.stop()
}

if (kubo != null) {
await kubo.stop()
}
})

it('should be able to receive a block from a peer', async () => {
const input = Uint8Array.from([0, 1, 2, 3, 4])
const { cid } = await kubo.api.add({ content: input }, {
cidVersion: 1,
rawLeaves: true
})

await helia.libp2p.dial(kuboInfo.multiaddrs.map(str => multiaddr(str)))

const output = await helia.blockstore.get(CID.parse(cid.toString()))

expect(output).to.equalBytes(input)
})

it('should be able to receive a block from a session provider', async () => {
const input = Uint8Array.from([0, 1, 2, 3, 4])
const { cid } = await kubo.api.add({ content: input }, {
cidVersion: 1,
rawLeaves: true
})
const root = CID.parse(cid.toString())

const session = helia.blockstore.createSession(root, {
providers: [
kuboInfo.multiaddrs.map(str => multiaddr(str))
]
})

const output = await session.get(root)

expect(output).to.equalBytes(input)
})
})
Loading