Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable block brokers #280

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
],
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand All @@ -26,6 +42,14 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./block-brokers": {
"types": "./dist/src/block-brokers/index.d.ts",
"import": "./dist/src/block-brokers/index.js"
},
"./hashers": {
"types": "./dist/src/utils/default-hashers.d.ts",
"import": "./dist/src/utils/default-hashers.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -68,6 +92,7 @@
"@libp2p/webrtc": "^3.1.3",
"@libp2p/websockets": "^7.0.2",
"@libp2p/webtransport": "^3.0.3",
"any-signal": "^4.1.1",
"blockstore-core": "^4.0.0",
"cborg": "^4.0.1",
"datastore-core": "^9.0.0",
Expand Down
58 changes: 58 additions & 0 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.bitswap.start()
this.started = true
}

async stop (): Promise<void> {
await this.bitswap.stop()
this.started = false
}

announce (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BitswapBlockBroker } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map(url => new URL(url.toString()))
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]

log('getting block for %c from %s', cid, url)

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)

throw err
}
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}
38 changes: 5 additions & 33 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import { type Bitswap, createBitswap } from 'ipfs-bitswap'
import drain from 'it-drain'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
Expand All @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia')

Expand All @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia {
public datastore: Datastore
public pins: Pins

#bitswap?: Bitswap

constructor (init: HeliaImplInit) {
const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
blockBrokers: init.blockBrokers,
hashers: init.hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand All @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia {

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

await this.#bitswap?.start()
await start(this.blockstore)
await this.libp2p.start()
}

async stop (): Promise<void> {
await this.libp2p.stop()
await this.#bitswap?.stop()
await stop(this.blockstore)
}

async gc (options: GCOptions = {}): Promise<void> {
Expand Down
37 changes: 36 additions & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -91,6 +94,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
*/
dagWalkers?: DAGWalker[]

/**
* A list of strategies used to fetch blocks when they are not present in
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
* the local blockstore
*/
blockBrokers?: BlockBroker[]

/**
* Pass `false` to not start the Helia node
*/
Expand All @@ -114,6 +123,23 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -131,11 +157,20 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers = defaultHashers(init.hashers)

const blockBrokers = init.blockBrokers ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p
libp2p,
blockBrokers,
hashers
})

if (init.start !== false) {
Expand Down
19 changes: 18 additions & 1 deletion packages/helia/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { start, stop, type Startable } from '@libp2p/interface/startable'
import createMortice from 'mortice'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { Pins } from '@helia/interface/pins'
Expand All @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions {
* blockstore (that may be on disk, s3, or something else). If the blocks are
* not present Bitswap will be used to fetch them from network peers.
*/
export class BlockStorage implements Blocks {
export class BlockStorage implements Blocks, Startable {
public lock: Mortice
private readonly child: Blockstore
private readonly pins: Pins
private started: boolean

/**
* Create a new BlockStorage
Expand All @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks {
this.lock = createMortice({
singleProcess: options.holdGcLock
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await start(this.child)
this.started = true
}

async stop (): Promise<void> {
await stop(this.child)
this.started = false
}

unwrap (): Blockstore {
Expand Down
12 changes: 12 additions & 0 deletions packages/helia/src/utils/default-hashers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import type { MultihashHasher } from 'multiformats/hashes/interface'

export function defaultHashers (hashers: MultihashHasher[] = []): MultihashHasher[] {
return [
sha256,
sha512,
identity,
...hashers
]
}
Loading