Skip to content

Commit

Permalink
feat: add support for Redis Cluster
Browse files Browse the repository at this point in the history
The 'numsub' command only returns the number of subscribers on the
current node, so we have to ask every node of the cluster.

Fixes #267
Fixes #210
  • Loading branch information
laurentP22 authored Jun 4, 2020
1 parent 597a8d1 commit 7a19075
Showing 1 changed file with 47 additions and 24 deletions.
71 changes: 47 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,41 @@ function adapter(uri, opts) {
Adapter.prototype.broadcast.call(this, packet, opts);
};


/**
* Get the number of subscribers of a channel
*
* @param {String} channel
*/

function getNumSub(channel){
if(pub.constructor.name != 'Cluster'){
// RedisClient or Redis
return new Promise(function(resolve,reject) {
pub.send_command('pubsub', ['numsub', channel], function(err, numsub){
if (err) return reject(err);
resolve(parseInt(numsub[1], 10));
});
})
}else{
// Cluster
var nodes = pub.nodes();
return Promise.all(
nodes.map(function(node) {
return node.send_command('pubsub', ['numsub', channel]);
})
).then(function(values) {
var numsub = 0;
values.map(function(value){
numsub += parseInt(value[1], 10);
})
return numsub;
}).catch(function(err){
throw err;
});
}
}

/**
* Gets a list of clients by sid.
*
Expand All @@ -435,14 +470,7 @@ function adapter(uri, opts) {
var self = this;
var requestid = uid2(6);

pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = parseInt(numsub[1], 10);
getNumSub(self.requestChannel).then(numsub => {
debug('waiting for %d responses to "clients" request', numsub);

var request = JSON.stringify({
Expand All @@ -468,6 +496,9 @@ function adapter(uri, opts) {
};

pub.publish(self.requestChannel, request);
}).catch(err => {
self.emit('error', err);
if (fn) fn(err);
});
};

Expand Down Expand Up @@ -524,14 +555,7 @@ function adapter(uri, opts) {
var self = this;
var requestid = uid2(6);

pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = parseInt(numsub[1], 10);
getNumSub(self.requestChannel).then(numsub => {
debug('waiting for %d responses to "allRooms" request', numsub);

var request = JSON.stringify({
Expand All @@ -556,6 +580,9 @@ function adapter(uri, opts) {
};

pub.publish(self.requestChannel, request);
}).catch(err => {
self.emit('error', err);
if (fn) fn(err);
});
};

Expand Down Expand Up @@ -700,14 +727,7 @@ function adapter(uri, opts) {
var self = this;
var requestid = uid2(6);

pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = parseInt(numsub[1], 10);
getNumSub(self.requestChannel).then(numsub => {
debug('waiting for %d responses to "customRequest" request', numsub);

var request = JSON.stringify({
Expand All @@ -733,6 +753,9 @@ function adapter(uri, opts) {
};

pub.publish(self.requestChannel, request);
}).catch(err => {
self.emit('error', err);
if (fn) fn(err);
});
};

Expand Down

0 comments on commit 7a19075

Please sign in to comment.