-
Notifications
You must be signed in to change notification settings - Fork 0
/
WsPubSubServer.js
executable file
·108 lines (83 loc) · 2.88 KB
/
WsPubSubServer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"use strict";
const WebSocketServer = require('ws').Server;
const Diff = require('./static/Diff');
function WsPubSubServer(server, pubsub) {
this.pubsub = pubsub;
pubsub.on('publish', this.pubsubPublish.bind(this));
this.clients = [];
this.states = {};
this.start(server);
}
const proto = WsPubSubServer.prototype;
proto.start = function (server) {
const me = this;
const wss = this.wss = new WebSocketServer({ server: server });
wss.on('connection', function (ws) {
const client = {
ws: ws,
channels: {}
};
me.clients.push(client);
ws.on('message', function (msg) {
me.onMessage(msg, client);
});
ws.on('close', function () {
me.onClose(client);
});
});
};
proto.onMessage = function(msg, client) {
//console.log('got message', msg);
const data = JSON.parse(msg);
switch (data.action) {
case 'subscribe': this.subscribe(data.channel, client);
break;
case 'unsubscribe': this.unsubscribe(data.channel, client);
break;
case 'publish': this.publish(data.channel, data.payload, client);
break;
}
};
proto.onClose = function (client) {
Object.keys(client.channels).forEach(channel => {
this.unsubscribe(channel, client);
});
const idx = this.clients.indexOf(client);
this.clients.splice(idx, 1);
};
proto.pubsubPublish = function (channel, data) {
const diff = Diff.getDiff(this.states[channel], data);
this.states[channel] = data;
if (diff !== Diff.NODIFF_OBJ) {
const msg = JSON.stringify({ channel: channel, payload: diff });
this.clients.forEach(client => {
if (this.publishingClient !== client && client.channels[channel]) {
client.ws.send(msg);
}
});
}
};
proto.subscribe = function(channel, client) {
if (!client.channels[channel]) {
client.channels[channel] = true;
this.pubsub.subscribe(channel);
// send current state
const state = this.states[channel];
if (state) {
client.ws.send(JSON.stringify({ channel: channel, payload: state }));
}
}
};
proto.unsubscribe = function(channel, client) {
if (client.channels[channel]) {
client.channels[channel] = false;
this.pubsub.unsubscribe(channel);
}
};
proto.publish = function (channel, data, client) {
this.publishingClient = client;
this.pubsub.publish(channel, data); // this triggers pubsubPublish()
this.publishingClient = null;
};
if (typeof module !== 'undefined') module.exports = WsPubSubServer;
else (this.pigod || (this.pigod = {})).WsPubSubServer = WsPubSubServer;