Skip to content

Commit

Permalink
feat!: update to latest libp2p interfaces (#74)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: uses new single-issue libp2p interface modules

Co-authored-by: achingbrain <[email protected]>
  • Loading branch information
wemeetagain and achingbrain authored Jun 15, 2022
1 parent cee5c38 commit fe38340
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 73 deletions.
25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
# libp2p-pubsub <!-- omit in toc -->
# @libp2p/pubsub <!-- omit in toc -->

[![test & maybe release](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml/badge.svg)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml)
[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-pubsub.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-pubsub)
[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-pubsub/actions/workflows/js-test-and-release.yml)

> Contains an implementation of the Pubsub interface
> libp2p pubsub base class
## Table of contents <!-- omit in toc -->

- [Install](#install)
- [Usage](#usage)
- [License](#license)
- [Contribution](#contribution)
- [Contribution](#contribution)

## Install

```console
$ npm i @libp2p/pubsub
```

## Usage

Expand All @@ -28,9 +39,9 @@ class MyPubsubImplementation extends PubSubBaseProtocol {

Licensed under either of

* Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / http://www.apache.org/licenses/LICENSE-2.0)
* MIT ([LICENSE-MIT](LICENSE-MIT) / http://opensource.org/licenses/MIT)
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)

### Contribution
## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
37 changes: 21 additions & 16 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@
],
"exports": {
".": {
"import": "./dist/src/index.js",
"types": "./dist/src/index.d.ts"
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./errors": {
"import": "./dist/src/errors.js",
"types": "./dist/src/errors.d.ts"
"types": "./dist/src/errors.d.ts",
"import": "./dist/src/errors.js"
},
"./peer-streams": {
"import": "./dist/src/peer-streams.js",
"types": "./dist/src/peer-streams.d.ts"
"types": "./dist/src/peer-streams.d.ts",
"import": "./dist/src/peer-streams.js"
},
"./signature-policy": {
"import": "./dist/src/signature-policy.js",
"types": "./dist/src/signature-policy.d.ts"
"types": "./dist/src/signature-policy.d.ts",
"import": "./dist/src/signature-policy.js"
},
"./utils": {
"import": "./dist/src/utils.js",
"types": "./dist/src/utils.d.ts"
"types": "./dist/src/utils.d.ts",
"import": "./dist/src/utils.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -172,26 +172,31 @@
"release": "aegir release"
},
"dependencies": {
"@libp2p/crypto": "^0.22.8",
"@libp2p/interfaces": "^2.0.0",
"@libp2p/components": "^1.0.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^1.1.0",
"@libp2p/peer-collections": "^1.0.0",
"@libp2p/peer-id": "^1.1.0",
"@libp2p/topology": "^1.1.0",
"@multiformats/multiaddr": "^10.1.5",
"@libp2p/topology": "^2.0.0",
"@multiformats/multiaddr": "^10.2.0",
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.0",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^2.0.1",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.3",
"p-queue": "^7.2.0",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-connection": "^1.0.1",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^1.0.1",
"@libp2p/interface-registrar": "^1.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"aegir": "^37.0.7",
"aegir": "^37.2.0",
"delay": "^5.0.0",
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
Expand Down
29 changes: 16 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
signMessage,
verifySignature
} from './sign.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { Connection } from '@libp2p/interfaces/connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { Connection } from '@libp2p/interface-connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/interfaces/components'
import { Components, Initializable } from '@libp2p/components'

const log = logger('libp2p:pubsub')

Expand Down Expand Up @@ -63,7 +63,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
public multicodecs: string[]
public components: Components = new Components()

private _registrarTopologyId: string | undefined
private _registrarTopologyIds: string[] | undefined
protected enabled: boolean

constructor (props: PubSubInit) {
Expand Down Expand Up @@ -112,17 +112,18 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi

log('starting')

const registrar = this.components.getRegistrar()
// Incoming streams
// Called after a peer dials us
await this.components.getRegistrar().handle(this.multicodecs, this._onIncomingStream)
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream)))

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = createTopology({
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
})
this._registrarTopologyId = await this.components.getRegistrar().register(this.multicodecs, topology)
this._registrarTopologyIds = await Promise.all(this.multicodecs.map(async multicodec => await registrar.register(multicodec, topology)))

log('started')
this.started = true
Expand All @@ -136,12 +137,14 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
return
}

const registrar = this.components.getRegistrar()

// unregister protocol and handlers
if (this._registrarTopologyId != null) {
this.components.getRegistrar().unregister(this._registrarTopologyId)
if (this._registrarTopologyIds != null) {
this._registrarTopologyIds?.map(id => registrar.unregister(id))
}

await this.components.getRegistrar().unhandle(this.multicodecs)
await Promise.all(this.multicodecs.map(async multicodec => await registrar.unhandle(multicodec)))

log('stopping')
for (const peerStreams of this.peers.values()) {
Expand Down Expand Up @@ -553,7 +556,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
/**
* Get a list of the peer-ids that are subscribed to one topic.
*/
getSubscribers (topic: string) {
getSubscribers (topic: string): PeerId[] {
if (!this.started) {
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET')
}
Expand Down Expand Up @@ -676,7 +679,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
return Array.from(this.subscriptions)
}

getPeers () {
getPeers (): PeerId[] {
if (!this.started) {
throw new Error('Pubsub is not started')
}
Expand Down
6 changes: 3 additions & 3 deletions src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import * as lp from 'it-length-prefixed'
import { pushable } from 'it-pushable'
import { pipe } from 'it-pipe'
import { abortableSource } from 'abortable-iterator'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Stream } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Stream } from '@libp2p/interface-connection'
import type { Pushable } from 'it-pushable'
import type { PeerStreamEvents } from '@libp2p/interfaces/pubsub'
import type { PeerStreamEvents } from '@libp2p/interface-pubsub'

const log = logger('libp2p-pubsub:peer-streams')

Expand Down
4 changes: 2 additions & 2 deletions src/sign.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toRpcMessage } from './utils.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromKeys } from '@libp2p/peer-id'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomBytes } from 'iso-random-stream'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { codes } from './errors.js'
import errcode from 'err-code'
Expand Down
2 changes: 1 addition & 1 deletion test/emit-self.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from './utils/index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import delay from 'delay'
import { Components } from '@libp2p/interfaces/components'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'
const topic = 'foo'
Expand Down
2 changes: 1 addition & 1 deletion test/instance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'aegir/chai'
import { PubSubBaseProtocol } from '../src/index.js'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
Expand Down
16 changes: 8 additions & 8 deletions test/lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {
MockRegistrar,
mockIncomingStreamEvent
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Registrar } from '@libp2p/interfaces/registrar'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Registrar } from '@libp2p/interface-registrar'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
Expand Down Expand Up @@ -158,7 +158,7 @@ describe('pubsub base lifecycle', () => {

// Notify peers of connection
await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

expect(pubsubA.peers.size).to.be.eql(1)
expect(pubsubB.peers.size).to.be.eql(1)
Expand All @@ -179,7 +179,7 @@ describe('pubsub base lifecycle', () => {
sinon.spy(c0, 'newStream')

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
expect(c0.newStream).to.have.property('callCount', 1)

// @ts-expect-error _removePeer is a protected method
Expand Down Expand Up @@ -219,7 +219,7 @@ describe('pubsub base lifecycle', () => {
sinon.stub(c0, 'newStream').throws(error)

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

expect(c0.newStream).to.have.property('callCount', 1)
})
Expand All @@ -237,7 +237,7 @@ describe('pubsub base lifecycle', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))

// Notice peers of disconnect
topologyA?.onDisconnect(peerIdB)
Expand Down
6 changes: 3 additions & 3 deletions test/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {
MockRegistrar,
PubsubImplementation
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Message } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Message } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

describe('pubsub base messages', () => {
let peerId: PeerId
Expand Down
10 changes: 5 additions & 5 deletions test/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
PubsubImplementation,
mockIncomingStreamEvent
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PeerSet } from '@libp2p/peer-collections'
import { Components } from '@libp2p/interfaces/components'
import { Components } from '@libp2p/components'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { noSignMsgId } from '../src/utils.js'
import type { PubSubRPC } from '@libp2p/interfaces/src/pubsub'
import type { PubSubRPC } from '@libp2p/interface-pubsub'
import delay from 'delay'
import pDefer from 'p-defer'

Expand Down Expand Up @@ -149,7 +149,7 @@ describe('pubsub base implementation', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
})

afterEach(async () => {
Expand Down Expand Up @@ -259,7 +259,7 @@ describe('pubsub base implementation', () => {
const [c0, c1] = ConnectionPair()

await topologyA.onConnect(peerIdB, c0)
await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA))
await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA))
})

afterEach(async () => {
Expand Down
4 changes: 2 additions & 2 deletions test/sign.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import {
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { randomSeqno, toRpcMessage } from '../src/utils.js'
import { keys } from '@libp2p/crypto'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'

function encodeMessage (message: PubSubRPCMessage) {
return RPC.Message.encode(message)
Expand Down
6 changes: 3 additions & 3 deletions test/topic-validators.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
MockRegistrar,
PubsubImplementation
} from './utils/index.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PubSubRPC } from '@libp2p/interfaces/pubsub'
import { Components } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PubSubRPC } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'

Expand Down
2 changes: 1 addition & 1 deletion test/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'aegir/chai'
import * as utils from '../src/utils.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { Message, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'

describe('utils', () => {
Expand Down
Loading

0 comments on commit fe38340

Please sign in to comment.