Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable block brokers #280

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"docs": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs -- --exclude packages/interop --excludeExternals",
"docs:no-publish": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs --publish false -- --exclude packages/interop"
},
"dependencies": {
"any-signal": "^4.1.1"
},
"devDependencies": {
"aegir": "^41.0.0",
"npm-run-all": "^4.1.5",
Expand Down
21 changes: 21 additions & 0 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
],
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand All @@ -26,6 +42,10 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./block-providers": {
"types": "./dist/src/block-providers/index.d.ts",
"import": "./dist/src/block-providers/index.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -68,6 +88,7 @@
"@libp2p/webrtc": "^3.1.3",
"@libp2p/websockets": "^7.0.2",
"@libp2p/webtransport": "^3.0.3",
"any-signal": "^4.1.1",
"blockstore-core": "^4.0.0",
"cborg": "^4.0.1",
"datastore-core": "^9.0.0",
Expand Down
59 changes: 59 additions & 0 deletions packages/helia/src/block-providers/bitswap-block-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockProvider implements BlockProvider<
ProgressOptions<BitswapNotifyProgressEvents>,
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bitswap already verifies blocks pulled from the network. If we like the approach in this PR we should PR bitswap to skip verification and let it be handled here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we did remove verification from bitswap, we should only do it via configuration flag (verifyBlocks (default true)) so current users don't have a hard migration to do validation themselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd release a major. Despite it being published for consumption outside IPFS projects realistically I don't anyone else is using it bar a few weekend hackers.

})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.bitswap.start()
this.started = true
}

async stop (): Promise<void> {
await this.bitswap.stop()
this.started = false
}

notify (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
this.bitswap.notify(cid, block, options)
}

async get (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/block-providers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BitswapBlockProvider } from './bitswap-block-provider.js'
export { TrustedGatewayBlockProvider } from './trustless-gateway-block-provider.js'
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { logger } from '@libp2p/logger'
import type { BlockProvider } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A BlockProvider that accepts a list of trustless gateways that are queried
* for blocks. Individual gateways are randomly chosen.
*/
export class TrustedGatewayBlockProvider implements BlockProvider<
ProgressOptions,
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

constructor (urls: string[]) {
this.gateways = urls.map(url => new URL(url.toString()))
}
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void {
// no-op
}
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

async get (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Randomly chosen gateway. If a gateway returns a 5xx we may wish to temporarily remove it from rotation and try another?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we definitely should enable some ability to retry other given gateways. IIRC, we discussed in https://pl-strflt.notion.site/Helia-reliable-retrieval-technical-design-golden-path-255-15c4d3c25a404a85b6db8bf3f8d1f310?pvs=4 that just spamming all gateways would be fine for now because we're trying to optimize for successful retrieval.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I had a moment of doubt when I realised we'd be making 50 HTTP requests for a 10 block DAG.


log('getting block for %c from %s', cid, url)

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)

throw err
}
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}
38 changes: 5 additions & 33 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import { type Bitswap, createBitswap } from 'ipfs-bitswap'
import drain from 'it-drain'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
Expand All @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia')

Expand All @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia {
public datastore: Datastore
public pins: Pins

#bitswap?: Bitswap

constructor (init: HeliaImplInit) {
const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
blockProviders: init.blockProviders,
hashers: init.hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand All @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia {

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

await this.#bitswap?.start()
await start(this.blockstore)
await this.libp2p.start()
}

async stop (): Promise<void> {
await this.libp2p.stop()
await this.#bitswap?.stop()
await stop(this.blockstore)
}

async gc (options: GCOptions = {}): Promise<void> {
Expand Down
43 changes: 42 additions & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { BitswapBlockProvider } from './block-providers/bitswap-block-provider.js'
import { TrustedGatewayBlockProvider } from './block-providers/trustless-gateway-block-provider.js'
import { HeliaImpl } from './helia.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { BlockProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -91,6 +96,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
*/
dagWalkers?: DAGWalker[]

/**
* A list of strategies used to fetch blocks when they are not present in
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
* the local blockstore
*/
blockProviders?: BlockProvider[]

/**
* Pass `false` to not start the Helia node
*/
Expand All @@ -114,6 +125,23 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -131,11 +159,24 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

const blockProviders = init.blockProviders ?? [
new BitswapBlockProvider(libp2p, blockstore, hashers),
new TrustedGatewayBlockProvider(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p
libp2p,
blockProviders
})

if (init.start !== false) {
Expand Down
19 changes: 18 additions & 1 deletion packages/helia/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { start, stop, type Startable } from '@libp2p/interface/startable'
import createMortice from 'mortice'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { Pins } from '@helia/interface/pins'
Expand All @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions {
* blockstore (that may be on disk, s3, or something else). If the blocks are
* not present Bitswap will be used to fetch them from network peers.
*/
export class BlockStorage implements Blocks {
export class BlockStorage implements Blocks, Startable {
public lock: Mortice
private readonly child: Blockstore
private readonly pins: Pins
private started: boolean

/**
* Create a new BlockStorage
Expand All @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks {
this.lock = createMortice({
singleProcess: options.holdGcLock
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await start(this.child)
this.started = true
}

async stop (): Promise<void> {
await stop(this.child)
this.started = false
}

unwrap (): Blockstore {
Expand Down
Loading