Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
"interface-blockstore": "^5.3.1",
"interface-store": "^6.0.2",
"multiformats": "^13.3.1",
"progress-events": "^1.0.1"
"progress-events": "^1.0.1",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
"@libp2p/crypto": "^5.0.7",
Expand Down
2 changes: 1 addition & 1 deletion packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
this.log('getting block for %c from %s', cid, gateway.url)

try {
const block = await gateway.getRawBlock(cid, options.signal)
const block = await gateway.getRawBlock(cid, options)
this.log.trace('got block for %c from %s', cid, gateway.url)

try {
Expand Down
6 changes: 6 additions & 0 deletions packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import type { ProgressEvent } from 'progress-events'

export const DEFAULT_ALLOW_INSECURE = false
export const DEFAULT_ALLOW_LOCAL = false
/**
* The maximum number of bytes to allow when fetching a raw block.
*
* @see https://specs.ipfs.tech/bitswap-protocol/#block-sizes
*/
export const DEFAULT_MAX_SIZE = 2_097_152

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>
Expand Down
2 changes: 1 addition & 1 deletion packages/block-brokers/src/trustless-gateway/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
async queryProvider (cid: CID, provider: TrustlessGateway, options: BlockRetrievalOptions): Promise<Uint8Array> {
this.log('fetching BLOCK for %c from %s', cid, provider.url)

const block = await provider.getRawBlock(cid, options.signal)
const block = await provider.getRawBlock(cid, options)
this.log.trace('got block for %c from %s', cid, provider.url)

await options.validateFn?.(block)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { base64 } from 'multiformats/bases/base64'
import { limitedResponse } from './utils.js'
import { DEFAULT_MAX_SIZE } from './index.js'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

Expand All @@ -19,6 +21,17 @@ export interface TrustlessGatewayComponents {
transformRequestInit?: TransformRequestInit
}

export interface GetRawBlockOptions {
signal?: AbortSignal

/**
* The maximum number of bytes to allow when fetching a raw block.
*
* @default 2_097_152 (2MiB)
*/
maxSize?: number
}

/**
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
Expand Down Expand Up @@ -89,7 +102,7 @@ export class TrustlessGateway {
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
*/
async getRawBlock (cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
async getRawBlock (cid: CID, { signal, maxSize = DEFAULT_MAX_SIZE }: GetRawBlockOptions = {}): Promise<Uint8Array> {
const gwUrl = new URL(this.url.toString())
gwUrl.pathname = `/ipfs/${cid.toString()}`

Expand Down Expand Up @@ -130,8 +143,11 @@ export class TrustlessGateway {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
}
// limited Response ensures the body is less than 2MiB (or configurable maxSize)
// see https://github.com/ipfs/helia/issues/790
const body = await limitedResponse(res, maxSize, { signal: innerController.signal, log: this.log })
this.#successes++
return new Uint8Array(await res.arrayBuffer())
return body
})
this.#pendingResponses.set(blockId, pendingResponse)
}
Expand Down
69 changes: 68 additions & 1 deletion packages/block-brokers/src/trustless-gateway/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { Uint8ArrayList } from 'uint8arraylist'
import { TrustlessGateway } from './trustless-gateway.js'
import type { TransformRequestInit } from './trustless-gateway.js'
import type { Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'

Expand Down Expand Up @@ -56,3 +57,69 @@
yield new TrustlessGateway(uri, { logger, transformRequestInit: options.transformRequestInit })
}
}

interface LimitedResponseOptions {
signal?: AbortSignal
log?: Logger
}

/**
* A function that handles ensuring the content-length header and the response body is less than a given byte limit.
*
* If the response contains a content-length header greater than the limit or the actual bytes returned are greater than
* the limit, an error is thrown.
*/
export async function limitedResponse (response: Response, byteLimit: number, options?: LimitedResponseOptions): Promise<Uint8Array> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the property name here as byteLimit but could change to maxSize if we want.. leaving for now

const { signal, log } = options ?? {}
const contentLength = response.headers.get('content-length')
if (contentLength != null) {
const contentLengthNumber = parseInt(contentLength, 10)
if (contentLengthNumber > byteLimit) {
log?.error('content-length header (%d) is greater than the limit (%d)', contentLengthNumber, byteLimit)
if (response.body != null) {
await response.body.cancel().catch(err => {
log?.error('error cancelling response body after content-length check - %e', err)

Check warning on line 81 in packages/block-brokers/src/trustless-gateway/utils.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/utils.ts#L81

Added line #L81 was not covered by tests
})
}
throw new Error(`Content-Length header (${contentLengthNumber}) is greater than the limit (${byteLimit}).`)
}
}

const reader = response.body?.getReader()
if (reader == null) {
// no body to consume if reader is null
throw new Error('Response body is not readable')

Check warning on line 91 in packages/block-brokers/src/trustless-gateway/utils.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/utils.ts#L90-L91

Added lines #L90 - L91 were not covered by tests
}

const chunkList = new Uint8ArrayList()

try {
while (true) {
if (signal?.aborted === true) {
throw new Error('Response body read was aborted.')
}

const { done, value } = await reader.read()
if (done) {
break
}

chunkList.append(value)

if (chunkList.byteLength > byteLimit) {
// No need to consume body here, as we were streaming and hit the limit
throw new Error(`Response body is greater than the limit (${byteLimit}), received ${chunkList.byteLength} bytes.`)
}
}
} finally {
reader.cancel()
.catch(err => {
log?.error('error cancelling reader - %e', err)
})
.finally(() => {
reader.releaseLock()
})
}

return chunkList.subarray()
}
52 changes: 51 additions & 1 deletion packages/block-brokers/test/trustless-gateway-utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import { filterNonHTTPMultiaddrs } from '../src/trustless-gateway/utils.js'
import { filterNonHTTPMultiaddrs, limitedResponse } from '../src/trustless-gateway/utils.js'

describe('trustless-gateway-block-broker-utils', () => {
it('filterNonHTTPMultiaddrs respects allowInsecure multiaddrs correctly', async function () {
Expand Down Expand Up @@ -51,4 +51,54 @@ describe('trustless-gateway-block-broker-utils', () => {

expect(filtered.length).to.deep.equal(1)
})

it('limitedResponse throws an error when the content-length header is greater than the limit', async function () {
const response = new Response('x'.repeat(1_000_000), {
headers: {
'content-length': '1000000'
}
})

await expect(limitedResponse(response, 100)).to.eventually.be.rejected
.with.property('message', 'Content-Length header (1000000) is greater than the limit (100).')
})

it('limitedResponse throws an error when the response body is greater than the limit', async function () {
const response = new Response('x'.repeat(1_000_000), {
headers: {
'content-length': '100'
}
})

await expect(limitedResponse(response, 100)).to.eventually.be.rejected
.with.property('message', 'Response body is greater than the limit (100), received 1000000 bytes.')
})

it('limitedResponse handles aborted signals', async function () {
const abortController = new AbortController()
let pullCount = 0

const responseBody = new ReadableStream({
start (controller) {
controller.enqueue(Uint8Array.from([0]))
},
pull (controller) {
pullCount++
controller.enqueue(Uint8Array.from([0]))
if (!abortController.signal.aborted && pullCount === 2) {
abortController.abort()
}
},
cancel (controller) {
controller.close()
}
})

await expect(limitedResponse(new Response(responseBody, {
headers: {
'content-length': '1000000'
}
}), 1_000_000, { signal: abortController.signal })).to.eventually.be.rejected
.with.property('message', 'Response body read was aborted.')
})
})
9 changes: 9 additions & 0 deletions packages/interface/src/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ export interface BlockRetrievalOptions <ProgressEvents extends ProgressEvent<any
* and WILL consider the gateway that returned the invalid blocks completely unreliable.
*/
validateFn?(block: Uint8Array): Promise<void>

/**
* The maximum size a block can be in bytes.
*
* Attempts to retrieve a block larger than this will cause an error to be thrown.
*
* @default 2_097_152
*/
maxSize?: number
}

export interface BlockAnnounceOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {
Expand Down