Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: remove subscribe handler
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Aug 21, 2020
1 parent a45ff8d commit 6a967fd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 55 deletions.
12 changes: 4 additions & 8 deletions src/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ Table of Contents
* [Parameters](#parameters)
* [Returns](#returns-2)
* [Subscribe](#subscribe)
* [pubsub.subscribe(topic, [handler])](#pubsubsubscribetopic-handler)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-1)
* [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic, [handler])](#pubsubunsubscribetopic-handler)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-2)
* [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics)
Expand Down Expand Up @@ -133,29 +133,25 @@ Publish data message to pubsub topics.

Subscribe to the given topic.

#### `pubsub.subscribe(topic, [handler])`
#### `pubsub.subscribe(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |
| [handler] | `function (msg)` | handler for messages received in the given topic |

### Unsubscribe

Unsubscribe from the given topic.

#### `pubsub.unsubscribe(topic, [handler])`
#### `pubsub.unsubscribe(topic)`

##### Parameters

| Name | Type | Description |
|------|------|-------------|
| topic | `string` | pubsub topic |
| [handler] | `function (msg)` | handler for messages received in the given topic |

If **NO** `handler` is provided, all registered handlers to the given topic will be removed.

### Get Topics

Expand Down
17 changes: 3 additions & 14 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,9 @@ class PubsubBaseProtocol extends EventEmitter {
* Subscribes to a given topic.
* @abstract
* @param {string} topic
* @param {function(msg: InMessage)} [handler]
* @returns {void}
*/
subscribe (topic, handler) {
subscribe (topic) {
if (!this.started) {
throw new Error('Pubsub has not started')
}
Expand All @@ -617,27 +616,17 @@ class PubsubBaseProtocol extends EventEmitter {
this.subscriptions.add(topic)
this.peers.forEach((_, id) => this._sendSubscriptions(id, [topic], true))
}

// Bind provided handler
handler && this.on(topic, handler)
}

/**
* Unsubscribe from the given topic.
* @override
* @param {string} topic
* @param {function} [handler]
* @returns {void}
*/
unsubscribe (topic, handler) {
unsubscribe (topic) {
if (!this.started) {
throw new Error('FloodSub is not started')
}

if (!handler) {
this.removeAllListeners(topic)
} else {
this.removeListener(topic, handler)
throw new Error('Pubsub is not started')
}

if (this.subscriptions.has(topic) && this.listenerCount(topic) === 0) {
Expand Down
10 changes: 6 additions & 4 deletions src/pubsub/tests/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@ module.exports = (common) => {
})

it('can subscribe and unsubscribe correctly', async () => {
const handler = (msg) => {
const handler = () => {
throw new Error('a message should not be received')
}

pubsub.start()
pubsub.subscribe(topic, handler)
pubsub.subscribe(topic)
pubsub.on('topic', handler)

await pWaitFor(() => {
const topics = pubsub.getTopics()
return topics.length === 1 && topics[0] === topic
})

pubsub.unsubscribe(topic, handler)
pubsub.unsubscribe(topic)

await pWaitFor(() => !pubsub.getTopics().length)

Expand All @@ -81,7 +82,8 @@ module.exports = (common) => {

pubsub.start()

pubsub.subscribe(topic, handler)
pubsub.subscribe(topic)
pubsub.on(topic, handler)
await pubsub.publish(topic, data)
await defer.promise

Expand Down
32 changes: 3 additions & 29 deletions test/pubsub/pubsub.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'

const { expect } = require('aegir/utils/chai')
Expand Down Expand Up @@ -74,21 +75,11 @@ describe('pubsub base implementation', () => {

afterEach(() => pubsub.stop())

it('should add subscription without handler', () => {
it('should add subscription', () => {
pubsub.subscribe(topic)

expect(pubsub.subscriptions.size).to.eql(1)
expect(pubsub.subscriptions.has(topic)).to.be.true()
expect(pubsub.listenerCount(topic)).to.eql(0)
})

it('should add subscription with handler', () => {
const handler = (msg) => {}
pubsub.subscribe(topic, handler)

expect(pubsub.subscriptions.size).to.eql(1)
expect(pubsub.subscriptions.has(topic)).to.be.true()
expect(pubsub.listenerCount(topic)).to.eql(1)
})
})

Expand Down Expand Up @@ -173,32 +164,15 @@ describe('pubsub base implementation', () => {

afterEach(() => pubsub.stop())

it('should remove all listeners for a topic if no handler provided', () => {
it('should remove all subscriptions for a topic', () => {
pubsub.subscribe(topic, (msg) => {})
pubsub.subscribe(topic, (msg) => {})

expect(pubsub.listenerCount(topic)).to.eql(2)
expect(pubsub.subscriptions.size).to.eql(1)

pubsub.unsubscribe(topic)

expect(pubsub.subscriptions.size).to.eql(0)
expect(pubsub.listenerCount(topic)).to.eql(0)
})

it('should remove the listeners for a topic if provided', () => {
const handler = (msg) => {}
pubsub.subscribe(topic, handler)
pubsub.subscribe(topic, (msg) => {})

expect(pubsub.listenerCount(topic)).to.eql(2)
expect(pubsub.subscriptions.size).to.eql(1)

pubsub.unsubscribe(topic, handler)

expect(pubsub.listenerCount(topic)).to.eql(1)
// should only remove subscription if no listeners
expect(pubsub.subscriptions.size).to.eql(1)
})
})

Expand Down

0 comments on commit 6a967fd

Please sign in to comment.