Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat: interface pubsub #60

Merged
merged 18 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,28 @@
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"delay": "^4.3.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"err-code": "^2.0.0",
"it-goodbye": "^2.0.1",
"it-length-prefixed": "^3.1.0",
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"libp2p-crypto": "^0.17.9",
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
"p-defer": "^3.0.0",
"p-limit": "^2.3.0",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.0",
"protons": "^2.0.0",
"sinon": "^9.0.2",
"streaming-iterables": "^5.0.2"
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
"aegir": "^25.0.0",
Expand Down
218 changes: 218 additions & 0 deletions src/pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
interface-pubsub
==================

The `interface-pubsub` contains the base implementation for a libp2p pubsub router implementation. This interface should be used to implement a pubsub router compatible with libp2p. It includes a test suite that pubsub routers should run, in order to ensure compatibility with libp2p.

## Implementations using this base protocol

You can use the following implementations as examples for building your own pubsub router.

- [libp2p/js-libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub)
- [ChainSafe/js-libp2p-gossipsub](https://github.com/ChainSafe/js-libp2p-gossipsub)

## Interface usage

`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algorithm, instead of also needing to create the setup for it.

A pubsub router implementation should start by extending the `interface-pubsub` class and **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions, according to the router algorithms.

Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding `start` and `stop` **MUST** call `super`. The `start` function is responsible for registering the pubsub protocol with libp2p, while the `stop` function is responsible for unregistering the pubsub protocol and closing pubsub connections.

All the remaining functions **MUST NOT** be overwritten.

The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.

```JavaScript
const Pubsub = require('libp2p-pubsub')

class PubsubImplementation extends Pubsub {
constructor({ peerId, registrar, ...options })
super({
debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0',
peerId: peerId,
registrar: registrar,
signMessages: options.signMessages,
strictSigning: options.strictSigning
})
}

_processMessages(idB58Str, conn, peer) {
// Required to be implemented by the subclass
// Process each message accordingly
}

publish() {
// Required to be implemented by the subclass
}

subscribe() {
// Required to be implemented by the subclass
}

unsubscribe() {
// Required to be implemented by the subclass
}

getTopics() {
// Required to be implemented by the subclass
}
}
```

## API

The interface aims to specify a common interface that all pubsub router implementation should follow.

### Start

Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.

#### `pubsub.start()`

##### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves once pubsub starts |

### Stop

Stops the pubsub subsystem. The protocol will be unregistered from `libp2p`, which will remove all listeners for the protocol and the established connections will be closed.

#### `pubsub.stop()`

##### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves once pubsub stops |

### Publish

Publish data message to pubsub topics.

#### `pubsub.publish(topics, message)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topics | `Array<string>|string` | set of pubsub topics |
| message | `Uint8Array` | message to publish |

##### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves once the message is published to the network |

### Subscribe

Subscribe to the given topic(s).

#### `pubsub.subscribe(topics, [handler])`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topics | `Array<string>|string` | set of pubsub topics |
| [handler] | `function (msg)` | handler for messages received in the given topics |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the interface should only allow subscription/unsubscription of a single topic at a time.

Attaching a handler to multiple topics at once feels like an anti-pattern, or at least a less primary interface, in the same way we were previously attaching multiple topics to multiple messages at once in publish.

I think this will simplify some of the underlying implementations, too. The body of our subscription code looks roughly like: for (const topic of topics) { /* doSubscribe(topic) */ }. We would be able to remove this outer loop and better support extended behavior.

Also, go-libp2p-pubsub's interface for subscribing is for a single topic at a time, for what its worth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the API would look better with one topic. Taking into account that we are doing several breaking changes in pubsub, I think we should also change this.

The libp2p API doc also says a single topic, so we were supporting it below in the stack but we were not mentioning it. However, this should be pointed in the migration guide

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about publish? Should we also just accept one topic per API call? That's how we have it in libp2p main docs as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for publish also accepting a single topic per API call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it


### Unsubscribe

Unsubscribe from the given topic(s).

#### `pubsub.unsubscribe(topics, [handler])`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topics | `Array<string>|string` | set of pubsub topics |
| [handler] | `function (msg)` | handler for messages received in the given topics |

If **NO** `handler` is provided, all registered handlers to the given topics will be removed.

### Get Topics

Get the list of topics which the peer is subscribed to.

#### `pubsub.getTopics()`

##### Returns

| Type | Description |
|------|-------------|
| `Array<String>` | Array of subscribed topics |

### Get Peers Subscribed to a topic

Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that are subscribed to one topic.

#### `pubsub.getSubscribers(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |

##### Returns

| Type | Description |
|------|-------------|
| `Array<string>` | Array of base-58 PeerId's |

### Validate

Validates the signature of a message.

#### `pubsub.validate(message)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| message | `Message` | a pubsub message |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | resolves if the message is valid |

## Test suite usage

```js
'use strict'

const tests = require('libp2p-interfaces/src/pubsub/tests')
const YourPubsubRouter = require('../src')

describe('compliance', () => {
let peers
let pubsubNodes = []

tests({
async setup (number = 1, options = {}) {
// Create number pubsub nodes with libp2p
peers = await createPeers({ number })

peers.forEach((peer) => {
const ps = new YourPubsubRouter(peer, options)

pubsubNodes.push(ps)
})

return pubsubNodes
},
async teardown () {
// Clean up any resources created by setup()
await Promise.all(pubsubNodes.map(ps => ps.stop()))
peers.length && await Promise.all(peers.map(peer => peer.stop()))
}
})
})
```
6 changes: 6 additions & 0 deletions src/pubsub/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'

exports.codes = {
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE'
}
Loading