-
Notifications
You must be signed in to change notification settings - Fork 459
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds three methods to implement the `/libp2p/fetch/0.0.1` protocol: * `libp2p.fetch(peerId, key) => Promise<Uint8Array>` * `libp2p.fetchService.registerLookupFunction(prefix, lookupFunction)` * `libp2p.fetchService.unRegisterLookupFunction(prefix, [lookupFunction])` Co-authored-by: achingbrain <[email protected]>
- Loading branch information
1 parent
00e4959
commit d8ceb0b
Showing
11 changed files
with
932 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
libp2p-fetch JavaScript Implementation | ||
===================================== | ||
|
||
> Libp2p fetch protocol JavaScript implementation | ||
## Overview | ||
|
||
An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch | ||
|
||
The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. | ||
|
||
## Usage | ||
|
||
```javascript | ||
const Libp2p = require('libp2p') | ||
|
||
/** | ||
* Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. | ||
* All keys must be prefixed my the same prefix, which will be used to find the appropriate key | ||
* lookup function. | ||
* @param key - a string | ||
* @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't | ||
* have a corresponding value. | ||
*/ | ||
async function my_subsystem_key_lookup(key) { | ||
// app specific callback to lookup key-value pairs. | ||
} | ||
|
||
// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' | ||
const libp2p = Libp2p.create(...) | ||
libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) | ||
|
||
const key = '/my_subsystem_key_prefix/{...}' | ||
const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance | ||
const value = await libp2p.fetch(peerDst, key) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
'use strict' | ||
|
||
module.exports = { | ||
// https://github.com/libp2p/specs/tree/master/fetch#wire-protocol | ||
PROTOCOL: '/libp2p/fetch/0.0.1' | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
'use strict' | ||
|
||
const debug = require('debug') | ||
const log = Object.assign(debug('libp2p:fetch'), { | ||
error: debug('libp2p:fetch:err') | ||
}) | ||
const errCode = require('err-code') | ||
const { codes } = require('../errors') | ||
const lp = require('it-length-prefixed') | ||
const { FetchRequest, FetchResponse } = require('./proto') | ||
// @ts-ignore it-handshake does not export types | ||
const handshake = require('it-handshake') | ||
const { PROTOCOL } = require('./constants') | ||
|
||
/** | ||
* @typedef {import('../')} Libp2p | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream | ||
* @typedef {(key: string) => Promise<Uint8Array | null>} LookupFunction | ||
*/ | ||
|
||
/** | ||
* A simple libp2p protocol for requesting a value corresponding to a key from a peer. | ||
* Developers can register one or more lookup function for retrieving the value corresponding to | ||
* a given key. Each lookup function must act on a distinct part of the overall key space, defined | ||
* by a fixed prefix that all keys that should be routed to that lookup function will start with. | ||
*/ | ||
class FetchProtocol { | ||
/** | ||
* @param {Libp2p} libp2p | ||
*/ | ||
constructor (libp2p) { | ||
this._lookupFunctions = new Map() // Maps key prefix to value lookup function | ||
this._libp2p = libp2p | ||
this.handleMessage = this.handleMessage.bind(this) | ||
} | ||
|
||
/** | ||
* Sends a request to fetch the value associated with the given key from the given peer. | ||
* | ||
* @param {PeerId|Multiaddr} peer | ||
* @param {string} key | ||
* @returns {Promise<Uint8Array | null>} | ||
*/ | ||
async fetch (peer, key) { | ||
// @ts-ignore multiaddr might not have toB58String | ||
log('dialing %s to %s', this._protocol, peer.toB58String ? peer.toB58String() : peer) | ||
|
||
const connection = await this._libp2p.dial(peer) | ||
const { stream } = await connection.newStream(FetchProtocol.PROTOCOL) | ||
const shake = handshake(stream) | ||
|
||
// send message | ||
const request = new FetchRequest({ identifier: key }) | ||
shake.write(lp.encode.single(FetchRequest.encode(request).finish())) | ||
|
||
// read response | ||
const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) | ||
switch (response.status) { | ||
case (FetchResponse.StatusCode.OK): { | ||
return response.data | ||
} | ||
case (FetchResponse.StatusCode.NOT_FOUND): { | ||
return null | ||
} | ||
case (FetchResponse.StatusCode.ERROR): { | ||
const errmsg = (new TextDecoder()).decode(response.data) | ||
throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) | ||
} | ||
default: { | ||
throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Invoked when a fetch request is received. Reads the request message off the given stream and | ||
* responds based on looking up the key in the request via the lookup callback that corresponds | ||
* to the key's prefix. | ||
* | ||
* @param {object} options | ||
* @param {MuxedStream} options.stream | ||
* @param {string} options.protocol | ||
*/ | ||
async handleMessage (options) { | ||
const { stream } = options | ||
const shake = handshake(stream) | ||
const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) | ||
|
||
let response | ||
const lookup = this._getLookupFunction(request.identifier) | ||
if (lookup) { | ||
const data = await lookup(request.identifier) | ||
if (data) { | ||
response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) | ||
} else { | ||
response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) | ||
} | ||
} else { | ||
const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) | ||
response = new FetchResponse({ status: FetchResponse.StatusCode.ERROR, data: errmsg }) | ||
} | ||
|
||
shake.write(lp.encode.single(FetchResponse.encode(response).finish())) | ||
} | ||
|
||
/** | ||
* Given a key, finds the appropriate function for looking up its corresponding value, based on | ||
* the key's prefix. | ||
* | ||
* @param {string} key | ||
*/ | ||
_getLookupFunction (key) { | ||
for (const prefix of this._lookupFunctions.keys()) { | ||
if (key.startsWith(prefix)) { | ||
return this._lookupFunctions.get(prefix) | ||
} | ||
} | ||
return null | ||
} | ||
|
||
/** | ||
* Registers a new lookup callback that can map keys to values, for a given set of keys that | ||
* share the same prefix. | ||
* | ||
* @param {string} prefix | ||
* @param {LookupFunction} lookup | ||
*/ | ||
registerLookupFunction (prefix, lookup) { | ||
if (this._lookupFunctions.has(prefix)) { | ||
throw errCode(new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered"), codes.ERR_KEY_ALREADY_EXISTS) | ||
} | ||
this._lookupFunctions.set(prefix, lookup) | ||
} | ||
|
||
/** | ||
* Registers a new lookup callback that can map keys to values, for a given set of keys that | ||
* share the same prefix. | ||
* | ||
* @param {string} prefix | ||
* @param {LookupFunction} [lookup] | ||
*/ | ||
unregisterLookupFunction (prefix, lookup) { | ||
if (lookup != null) { | ||
const existingLookup = this._lookupFunctions.get(prefix) | ||
|
||
if (existingLookup !== lookup) { | ||
return | ||
} | ||
} | ||
|
||
this._lookupFunctions.delete(prefix) | ||
} | ||
} | ||
|
||
FetchProtocol.PROTOCOL = PROTOCOL | ||
|
||
exports = module.exports = FetchProtocol |
Oops, something went wrong.