Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat: interface pubsub (#60)
Browse files Browse the repository at this point in the history
* feat: interface pubsub

* chore: pubsub router tests

* chore: move pubsub abstractions from gossipsub

* chore: address review

* chore: revamp docs

* chore: add emit self tests to interface

* chore: refactor base tests

* chore: publish should only accept one topic per api call

* chore: normalize msg before emit

* chore: do not reset inbound stream

* chore: apply suggestions from code review

Co-authored-by: Jacob Heun <[email protected]>

* chore: address review

* fix: remove subscribe handler

* chore: remove bits from create peerId

Co-authored-by: Jacob Heun <[email protected]>

* chore: remove delay from topic validators tests

* chore: add event emitter information

* fix: topic validator docs

Co-authored-by: Jacob Heun <[email protected]>
  • Loading branch information
vasco-santos and jacobheun authored Aug 25, 2020
1 parent c4be5ee commit ba15a48
Show file tree
Hide file tree
Showing 26 changed files with 3,377 additions and 0 deletions.
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,25 @@
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"delay": "^4.3.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"err-code": "^2.0.0",
"it-goodbye": "^2.0.1",
"it-length-prefixed": "^3.1.0",
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
"p-defer": "^3.0.0",
"p-limit": "^2.3.0",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.0",
"protons": "^2.0.0",
"sinon": "^9.0.2",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0"
Expand Down
236 changes: 236 additions & 0 deletions src/pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
interface-pubsub
==================

The `interface-pubsub` contains the base implementation for a libp2p pubsub router implementation. This interface should be used to implement a pubsub router compatible with libp2p. It includes a test suite that pubsub routers should run, in order to ensure compatibility with libp2p.

Table of Contents
=================

* [Implementations using this interface](#implementations-using-this-interface)
* [Interface usage](#interface-usage)
* [Extend interface](#extend-interface)
* [Example](#example)
* [API](#api)
* [Start](#start)
* [pubsub.start()](#pubsubstart)
* [Returns](#returns)
* [Stop](#stop)
* [pubsub.stop()](#pubsubstop)
* [Returns](#returns-1)
* [Publish](#publish)
* [pubsub.publish(topics, message)](#pubsubpublishtopics-message)
* [Parameters](#parameters)
* [Returns](#returns-2)
* [Subscribe](#subscribe)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-1)
* [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-2)
* [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics)
* [Returns](#returns-3)
* [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic)
* [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic)
* [Parameters](#parameters-3)
* [Returns](#returns-4)
* [Validate](#validate)
* [pubsub.validate(message)](#pubsubvalidatemessage)
* [Parameters](#parameters-4)
* [Returns](#returns-5)
* [Test suite usage](#test-suite-usage)

## Implementations using this interface

You can check the following implementations as examples for building your own pubsub router.

- [libp2p/js-libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub)
- [ChainSafe/js-libp2p-gossipsub](https://github.com/ChainSafe/js-libp2p-gossipsub)

## Interface usage

`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.

### Extend interface

A pubsub router implementation should start by extending the `interface-pubsub` class and **MUST** override the `_publish` function, according to the router algorithms. This function is responsible for forwarding publish messages to other peers, as well as forwarding received messages if the router provides the `canRelayMessage` option to the base implementation.

Other functions, such as `start`, `stop`, `subscribe`, `unsubscribe`, `_encodeRpc`, `_decodeRpc`, `_processRpcMessage`, `_addPeer` and `_removePeer` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding these functions **MUST** call `super`.

The `start` and `stop` functions are responsible for the registration of the pubsub protocol with libp2p. The `stop` function also guarantees that the open streams in the protocol are properly closed.

The `subscribe` and `unsubscribe` functions take care of the subscription management and its inherent message propagation.

When using a custom protobuf definition for message marshalling, you should override `_encodeRpc` and `_decodeRpc` to use the new protobuf instead of the default one.

`_processRpcMessage` is responsible for handling messages received from other peers. This should be extended if further operations/validations are needed by the router.

The `_addPeer` and `_removePeer` functions are called when new peers running the pubsub router protocol establish a connection with the peer. They are used for tracking the open streams between the peers.

All the remaining functions **MUST NOT** be overwritten.

### Example

The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.

```JavaScript
const Pubsub = require('libp2p-pubsub')

class PubsubImplementation extends Pubsub {
constructor({ libp2p, options })
super({
debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0',
libp2p,
signMessages: options.signMessages,
strictSigning: options.strictSigning
})
}

_publish (message) {
// Required to be implemented by the subclass
// Routing logic for the message
}
}
```

## API

The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message.

### Start

Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.

#### `pubsub.start()`

### Stop

Stops the pubsub subsystem. The protocol will be unregistered from `libp2p`, which will remove all listeners for the protocol and the established connections will be closed.

#### `pubsub.stop()`

### Publish

Publish data message to pubsub topics.

#### `pubsub.publish(topic, message)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |
| message | `Uint8Array` | message to publish |

##### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves once the message is published to the network |

### Subscribe

Subscribe to the given topic.

#### `pubsub.subscribe(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |

### Unsubscribe

Unsubscribe from the given topic.

#### `pubsub.unsubscribe(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |

### Get Topics

Get the list of topics which the peer is subscribed to.

#### `pubsub.getTopics()`

##### Returns

| Type | Description |
|------|-------------|
| `Array<String>` | Array of subscribed topics |

### Get Peers Subscribed to a topic

Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that are subscribed to one topic.

#### `pubsub.getSubscribers(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |

##### Returns

| Type | Description |
|------|-------------|
| `Array<string>` | Array of base-58 PeerId's |

### Validate

Validates the signature of a message.

#### `pubsub.validate(message)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| message | `Message` | a pubsub message |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves if the message is valid |

## Test suite usage

```js
'use strict'

const tests = require('libp2p-interfaces/src/pubsub/tests')
const YourPubsubRouter = require('../src')

describe('compliance', () => {
let peers
let pubsubNodes = []

tests({
async setup (number = 1, options = {}) {
// Create number pubsub nodes with libp2p
peers = await createPeers({ number })

peers.forEach((peer) => {
const ps = new YourPubsubRouter(peer, options)

pubsubNodes.push(ps)
})

return pubsubNodes
},
async teardown () {
// Clean up any resources created by setup()
await Promise.all(pubsubNodes.map(ps => ps.stop()))
peers.length && await Promise.all(peers.map(peer => peer.stop()))
}
})
})
```
6 changes: 6 additions & 0 deletions src/pubsub/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'

exports.codes = {
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE'
}
Loading

0 comments on commit ba15a48

Please sign in to comment.