Skip to content

Commit

Permalink
feat: GatewayBlockBroker prioritizes & tries all gateways (#281)
Browse files Browse the repository at this point in the history
Co-authored-by: achingbrain <[email protected]>
  • Loading branch information
SgtPooki and achingbrain authored Oct 13, 2023
1 parent 7ef5e79 commit 9bad21b
Show file tree
Hide file tree
Showing 17 changed files with 561 additions and 78 deletions.
4 changes: 4 additions & 0 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
],
libp2p: {
addresses: {
listen: [
Expand Down
14 changes: 11 additions & 3 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand Down Expand Up @@ -52,7 +52,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}

/**
* A helper factory for users who want to override Helia `blockBrokers` but
* still want to use the default `BitswapBlockBroker`.
*/
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
}
4 changes: 2 additions & 2 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
218 changes: 166 additions & 52 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,192 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>
const log = logger('helia:trustless-gateway-block-broker')

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
* have been more reliable in the past, and ensure that requests are distributed
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]
export class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
* block. This includes successful, errored, and aborted attempts. By counting
* even aborted attempts, slow gateways that are out-raced by others will be
* considered less reliable.
*/
#attempts = 0

/**
* The number of times this gateway has errored while attempting to fetch a
* block. This includes `response.ok === false` and any other errors that
* throw while attempting to fetch a block. This does not include aborted
* attempts.
*/
#errors = 0

/**
* The number of times this gateway has returned an invalid block. A gateway
* that returns the wrong blocks for a CID should be considered for removal
* from the list of gateways to fetch blocks from.
*/
#invalidBlocks = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
#successes = 0

constructor (urls: Array<string | URL>) {
this.gateways = urls.map(url => new URL(url.toString()))
constructor (url: URL | string) {
this.url = url instanceof URL ? url : new URL(url)
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]
/**
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
*/
async getRawBlock (cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = this.url
gwUrl.pathname = `/ipfs/${cid.toString()}`

log('getting block for %c from %s', cid, url)
// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`)
}

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)
this.#attempts++
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${this.url} was aborted`)
}
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)
/**
* Encapsulate the logic for determining whether a gateway is considered
* reliable, for prioritization. This is based on the number of successful attempts made
* and the number of errors encountered.
*
* Unused gateways have 100% reliability; They will be prioritized over
* gateways with a 100% success rate to ensure that we attempt all gateways.
*/
reliability (): number {
/**
* if we have never tried to use this gateway, it is considered the most
* reliable until we determine otherwise (prioritize unused gateways)
*/
if (this.#attempts === 0) {
return 1
}

throw err
if (this.#invalidBlocks > 0) {
// this gateway may not be trustworthy..
return -Infinity
}

/**
* We have attempted the gateway, so we need to calculate the reliability
* based on the number of attempts, errors, and successes. Gateways that
* return a single error should drop their reliability score more than a
* single success increases it.
*
* Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm
*/
return this.#successes / (this.#attempts + (this.#errors * 3))
}

/**
* Increment the number of invalid blocks returned by this gateway.
*/
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`
export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'
/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}
throw new Error(`unable to fetch raw block for CID ${cid}`)

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type BlockBroker } from '@helia/interface/blocks'
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
Expand All @@ -19,6 +20,7 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
blockBrokers: BlockBroker[]
datastore: Datastore
}

Expand Down
20 changes: 15 additions & 5 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -98,7 +98,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: BlockBroker[]
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>

/**
* Pass `false` to not start the Helia node
Expand Down Expand Up @@ -159,9 +159,19 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>

const hashers = defaultHashers(init.hashers)

const blockBrokers = init.blockBrokers ?? [
const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
if (typeof blockBroker !== 'function') {
return blockBroker satisfies BlockBroker
}
return blockBroker({
blockstore,
datastore,
libp2p,
hashers
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
Expand Down
Loading

0 comments on commit 9bad21b

Please sign in to comment.