Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
import cors from 'cors'
import polka from 'polka'


/**
* Middleware to log requests
*/
const requestLogs = []
let enableLogs = false
function logRequests(req, res, next) {
if (!req.url.includes('/logs') && enableLogs) {
requestLogs.push({
method: req.method,
url: req.url,
headers: req.headers
})
}
next()
}

/** @type {import('aegir').PartialOptions} */
const options = {
test: {
Expand All @@ -10,6 +27,7 @@ const options = {
host: '127.0.0.1'
})
goodGateway.use(cors())
goodGateway.use(logRequests)
goodGateway.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream',
Expand All @@ -28,6 +46,31 @@ const options = {
// "hello"
res.end(Uint8Array.from([104, 101, 108, 108, 111]))
})
goodGateway.all('/ipfs/*', (req, res) => {
// succeeds with empty block for any other CID
res.writeHead(200)
res.end(Uint8Array.from([]))
})

goodGateway.all('/logs', (req, res) => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify(requestLogs))
})
goodGateway.all('/logs/enable', (req, res) => {
enableLogs = true
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ message: 'Logging enabled' }))
})
goodGateway.all('/logs/disable', (req, res) => {
enableLogs = false
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ message: 'Logging disabled' }))
})
goodGateway.all('/logs/clear', (req, res) => {
requestLogs.length = 0
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ message: 'Logs cleared' }))
})

await goodGateway.listen()
const { port: goodGatewayPort } = goodGateway.server.address()
Expand Down
12 changes: 10 additions & 2 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createTrustlessGatewaySession } from './session.js'
import { findHttpGatewayProviders } from './utils.js'
import { DEFAULT_ALLOW_INSECURE, DEFAULT_ALLOW_LOCAL } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayBlockBrokerComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { TransformRequestInit } from './trustless-gateway.js'
import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'
Expand All @@ -22,6 +23,10 @@ export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptio
* @default false
*/
allowLocal?: boolean
/**
* Provide a function that will be called before querying trustless-gateways. This lets you modify the fetch options to pass custom headers or other necessary things.
*/
transformRequestInit?: TransformRequestInit
}

/**
Expand All @@ -31,6 +36,7 @@ export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptio
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly allowInsecure: boolean
private readonly allowLocal: boolean
private readonly transformRequestInit?: TransformRequestInit
private readonly routing: Routing
private readonly log: Logger
private readonly logger: ComponentLogger
Expand All @@ -41,12 +47,13 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
this.routing = components.routing
this.allowInsecure = init.allowInsecure ?? DEFAULT_ALLOW_INSECURE
this.allowLocal = init.allowLocal ?? DEFAULT_ALLOW_LOCAL
this.transformRequestInit = init.transformRequestInit
}

async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
const aggregateErrors: Error[] = []

for await (const gateway of findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, options)) {
for await (const gateway of findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, this.transformRequestInit, options)) {
this.log('getting block for %c from %s', cid, gateway.url)

try {
Expand Down Expand Up @@ -93,7 +100,8 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
}, {
...options,
allowLocal: this.allowLocal,
allowInsecure: this.allowInsecure
allowInsecure: this.allowInsecure,
transformRequestInit: this.transformRequestInit
})
}
}
5 changes: 5 additions & 0 deletions packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { TransformRequestInit } from './trustless-gateway.js'
import type { Routing, BlockBroker } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'
Expand All @@ -25,6 +26,10 @@ export interface TrustlessGatewayBlockBrokerInit {
* @default false
*/
allowLocal?: boolean
/**
* Provide a function that will be called before querying trustless-gateways. This lets you modify the fetch options to pass custom headers or other necessary things.
*/
transformRequestInit?: TransformRequestInit
}

export interface TrustlessGatewayBlockBrokerComponents {
Expand Down
8 changes: 5 additions & 3 deletions packages/block-brokers/src/trustless-gateway/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { findHttpGatewayProviders } from './utils.js'
import { DEFAULT_ALLOW_INSECURE, DEFAULT_ALLOW_LOCAL } from './index.js'
import type { CreateTrustlessGatewaySessionOptions } from './broker.js'
import type { TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { TrustlessGateway } from './trustless-gateway.js'
import type { TransformRequestInit, TrustlessGateway } from './trustless-gateway.js'
import type { BlockRetrievalOptions, Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { AbortOptions } from 'interface-store'
Expand All @@ -18,6 +18,7 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
private readonly routing: Routing
private readonly allowInsecure: boolean
private readonly allowLocal: boolean
private readonly transformRequestInit?: TransformRequestInit

constructor (components: TrustlessGatewaySessionComponents, init: CreateTrustlessGatewaySessionOptions) {
super(components, {
Expand All @@ -28,6 +29,7 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
this.routing = components.routing
this.allowInsecure = init.allowInsecure ?? DEFAULT_ALLOW_INSECURE
this.allowLocal = init.allowLocal ?? DEFAULT_ALLOW_LOCAL
this.transformRequestInit = init.transformRequestInit
}

async queryProvider (cid: CID, provider: TrustlessGateway, options: BlockRetrievalOptions): Promise<Uint8Array> {
Expand All @@ -41,8 +43,8 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
return block
}

async * findNewProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator<TrustlessGateway> {
yield * findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, options)
async * findNewProviders (cid: CID, options: AbortOptions = {}, headers: Record<string, string> = {}): AsyncGenerator<TrustlessGateway> {
yield * findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, this.transformRequestInit, options)
}

toEvictionKey (provider: TrustlessGateway): Uint8Array | string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ export interface TrustlessGatewayStats {
pendingResponses?: number
}

export interface TransformRequestInit {
(defaultReqInit: RequestInit): Promise<RequestInit> | RequestInit
}

export interface TrustlessGatewayComponents {
logger: ComponentLogger
transformRequestInit?: TransformRequestInit
}

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
Expand Down Expand Up @@ -54,9 +63,11 @@ export class TrustlessGateway {
readonly #pendingResponses = new Map<string, Promise<Uint8Array>>()

private readonly log: Logger
private readonly transformRequestInit?: TransformRequestInit

constructor (url: URL | string, logger: ComponentLogger) {
constructor (url: URL | string, { logger, transformRequestInit }: TrustlessGatewayComponents) {
this.url = url instanceof URL ? url : new URL(url)
this.transformRequestInit = transformRequestInit
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
}

Expand Down Expand Up @@ -103,13 +114,17 @@ export class TrustlessGateway {
let pendingResponse: Promise<Uint8Array> | undefined = this.#pendingResponses.get(blockId)
if (pendingResponse == null) {
this.#attempts++
pendingResponse = fetch(gwUrl.toString(), {
const defaultReqInit: RequestInit = {
signal: innerController.signal,
headers: {
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
}).then(async (res) => {
}

const reqInit: RequestInit = this.transformRequestInit != null ? await this.transformRequestInit(defaultReqInit) : defaultReqInit

pendingResponse = fetch(gwUrl.toString(), reqInit).then(async (res) => {
this.log('GET %s %d', gwUrl, res.status)
if (!res.ok) {
this.#errors++
Expand Down
6 changes: 3 additions & 3 deletions packages/block-brokers/src/trustless-gateway/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { TrustlessGateway } from './trustless-gateway.js'
import { TrustlessGateway, type TransformRequestInit } from './trustless-gateway.js'
import type { Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -33,7 +33,7 @@ export function filterNonHTTPMultiaddrs (multiaddrs: Multiaddr[], allowInsecure:
})
}

export async function * findHttpGatewayProviders (cid: CID, routing: Routing, logger: ComponentLogger, allowInsecure: boolean, allowLocal: boolean, options?: AbortOptions): AsyncGenerator<TrustlessGateway> {
export async function * findHttpGatewayProviders (cid: CID, routing: Routing, logger: ComponentLogger, allowInsecure: boolean, allowLocal: boolean, transformRequestInit?: TransformRequestInit, options: AbortOptions = {}): AsyncGenerator<TrustlessGateway> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's optional, can transformRequestInit go in the options object?

for await (const provider of routing.findProviders(cid, options)) {
// require http(s) addresses
const httpAddresses = filterNonHTTPMultiaddrs(provider.multiaddrs, allowInsecure, allowLocal)
Expand All @@ -48,6 +48,6 @@ export async function * findHttpGatewayProviders (cid: CID, routing: Routing, lo
// etc
const uri = multiaddrToUri(httpAddresses[0])

yield new TrustlessGateway(uri, logger)
yield new TrustlessGateway(uri, { logger, transformRequestInit })
}
}
32 changes: 31 additions & 1 deletion packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
if (process.env.TRUSTLESS_GATEWAY == null) {
return this.skip()
}
const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, defaultLogger())
const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, { logger: defaultLogger() })

// Call getRawBlock multiple times with the same CID
const promises = Array.from({ length: 10 }, async () => trustlessGateway.getRawBlock(cid))
Expand All @@ -171,4 +171,34 @@
pendingResponses: 0 // the queue is empty
})
})

it('can pass custom headers to the gateway', async function () {
if (process.env.TRUSTLESS_GATEWAY == null) {
return this.skip()
}

Check warning on line 178 in packages/block-brokers/test/trustless-gateway.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/test/trustless-gateway.spec.ts#L177-L178

Added lines #L177 - L178 were not covered by tests
const cid = CID.parse('bafybeic3q4y65yxu3yckr76q63bcvanhklwf6cwxuacnrot6v3gykrgsvq')

const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, {
logger: defaultLogger(),
transformRequestInit: (requestInit) => {
requestInit.headers = {
...requestInit.headers,
'X-My-Header': 'my-value'
}

return requestInit
}
})

await fetch(`${process.env.TRUSTLESS_GATEWAY}/logs/enable`)
await trustlessGateway.getRawBlock(cid)
await fetch(`${process.env.TRUSTLESS_GATEWAY}/logs/disable`)

const reqLogs = await fetch(`${process.env.TRUSTLESS_GATEWAY}/logs`)
const logs = await reqLogs.json()

// assert that fetch was called with the custom header
expect(logs).to.have.lengthOf(1)
expect(logs[0].headers['x-my-header']).to.equal('my-value')
})
})