Skip to content

Commit

Permalink
Merge pull request #77 from zettajs/peer-socket-refactor
Browse files Browse the repository at this point in the history
Peer socket refactor
  • Loading branch information
kevinswiber committed Oct 13, 2014
2 parents 38c7419 + db4d72b commit 2d89040
Show file tree
Hide file tree
Showing 20 changed files with 340 additions and 343 deletions.
1 change: 0 additions & 1 deletion lib/api_formats/siren/server.siren.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ module.exports = function(context) {
var entity = {
class: ['server'],
properties: {
id: server.id,
name: server._name
},
actions: [
Expand Down
2 changes: 1 addition & 1 deletion lib/api_resources/devices.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ DevicesResource.prototype.init = function(config) {
DevicesResource.prototype.list = function(env, next) {
var serverId = env.request.headers['zetta-forwarded-server'] || this.server.id;

var localServer = { path: '/servers/'+ serverId };
var localServer = { path: '/servers/'+ encodeURI(serverId) };
var context = { devices: this.server.runtime._jsDevices, loader: localServer, env: env };
env.format.render('devices', context);
next(env);
Expand Down
12 changes: 5 additions & 7 deletions lib/api_resources/peer_management.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,15 @@ PeerManagementBuilder.prototype.build = function() {

PeerManagementBuilder.prototype.entities = function() {
var self = this;

if (this.data && this.data.length) {
this.base.entities = this.data.map(function(peer) {
var peerUrl = peer.url || self.urlHelper.path('/servers/' + peer.id);
var peerUrl = peer.url || self.urlHelper.path('/servers/' + encodeURI(peer.id));

var entity = {
class: ['peer'],
properties: {
id: peer.id,
name: peer.name,
name: peer.id,
direction: peer.direction,
status: peer.status,
error: peer.error,
Expand All @@ -165,9 +164,8 @@ PeerManagementBuilder.prototype.entities = function() {
}

var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server;

entity.links = [
{ rel: [rels.self], href: self.urlHelper.join(peer.id) },
{ rel: [rels.self], href: self.urlHelper.join(encodeURI(peer.id)) },
{ rel: [peerUrlRel], href: peerUrl },
{ rel: [rels.monitor], href: peerUrl.replace(/^http/, 'ws') }
];
Expand Down Expand Up @@ -214,7 +212,7 @@ PeerItemBuilder.prototype.build = function() {
PeerItemBuilder.prototype.properties = function() {
this.base.properties = {
id: this.data.id,
name: this.data.name,
name: this.data.id,
direction: this.data.direction,
status: this.data.status,
error: this.data.error,
Expand All @@ -239,7 +237,7 @@ PeerItemBuilder.prototype.actions = function() {

PeerItemBuilder.prototype.links = function() {
var self = this;
var peerUrl = this.data.url || self.urlHelper.path('/servers/' + this.data.id);
var peerUrl = this.data.url || self.urlHelper.path('/servers/' + encodeURI(this.data.id));

var peerUrlRel = self.base.properties.direction === 'initiator' ? rels.root : rels.server;
this.base.links = [
Expand Down
6 changes: 3 additions & 3 deletions lib/api_resources/root.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RootResource.prototype.list = function(env, next) {
{
title: this.server._name,
rel: [rels.server],
href: env.helpers.url.path('/servers/'+this.server.id)
href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) )
}
]
};
Expand All @@ -39,9 +39,9 @@ RootResource.prototype.list = function(env, next) {
if (results) {
results.forEach(function(peer) {
env.response.body.links.push({
title: peer.name,
title: peer.id,
rel: [rels.peer],
href: env.helpers.url.path('/servers/' + peer.id)
href: env.helpers.url.path('/servers/' + encodeURI(peer.id))
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion lib/api_resources/servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var ServerResource = module.exports = function(server) {
ServerResource.prototype.getServer = function(env) {
var serverId = env.request.headers['zetta-forwarded-server'] || this.server.id;

return { path: '/servers/' + serverId };
return { path: '/servers/' + encodeURI(serverId) };
};

ServerResource.prototype.init = function(config) {
Expand Down
34 changes: 17 additions & 17 deletions lib/event_broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ var EventBroker = module.exports = function(zetta) {

EventBroker.prototype.peer = function(peer) {
var self = this;
this.peers[peer.serverId] = peer;
this._peerSubscriptions[peer.serverId] = this._peerSubscriptions[peer.serverId] || {};
this.peers[peer.name] = peer;
this._peerSubscriptions[peer.name] = this._peerSubscriptions[peer.name] || {};
// subscribe to topics for interest
Object.keys(this._peerSubscriptions[peer.serverId]).forEach(function(topic) {
Object.keys(this._peerSubscriptions[peer.name]).forEach(function(topic) {
peer.removeAllListeners(topic);
peer.on(topic, function(data) {
self._publish(topic, data);
Expand Down Expand Up @@ -47,7 +47,7 @@ EventBroker.prototype._subscribe = function(query) {
var topic = query.topic;

// is local
if (query.serverId === this.zetta.id) {
if (query.name === this.zetta.id) {
if (!this.subscriptions[topic]) {
this.subscriptions[topic] = { count: 0, listener: null };
}
Expand All @@ -61,13 +61,13 @@ EventBroker.prototype._subscribe = function(query) {
this.subscriptions[topic].count++;
} else {

if (!this._peerSubscriptions[query.serverId]) {
this._peerSubscriptions[query.serverId] = { topic: 0};
if (!this._peerSubscriptions[query.name]) {
this._peerSubscriptions[query.name] = { topic: 0};
}

if (!this._peerSubscriptions[query.serverId][topic]) {
this._peerSubscriptions[query.serverId][topic] = 0;
var peer = this.peers[query.serverId];
if (!this._peerSubscriptions[query.name][topic]) {
this._peerSubscriptions[query.name][topic] = 0;
var peer = this.peers[query.name];
if (peer) {
peer.removeAllListeners(topic);
peer.on(topic, function(data) {
Expand All @@ -76,14 +76,14 @@ EventBroker.prototype._subscribe = function(query) {
peer.subscribe(topic);
}
}
this._peerSubscriptions[query.serverId][topic]++;
this._peerSubscriptions[query.name][topic]++;
}
};

EventBroker.prototype._unsubscribe = function(query) {
var topic = query.topic;

if (query.serverId === this.zetta.id) {
if (query.name === this.zetta.id) {
this.subscriptions[topic].count--;
if (this.subscriptions[topic].count > 0) {
return;
Expand All @@ -92,16 +92,16 @@ EventBroker.prototype._unsubscribe = function(query) {
this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener);
delete this.subscriptions[topic];
} else {
if (!this._peerSubscriptions[query.serverId]) {
this._peerSubscriptions[query.serverId] = { topic: 1};
if (!this._peerSubscriptions[query.name]) {
this._peerSubscriptions[query.name] = { topic: 1};
}

this._peerSubscriptions[query.serverId][topic]--;
if (this._peerSubscriptions[query.serverId][topic] > 0) {
this._peerSubscriptions[query.name][topic]--;
if (this._peerSubscriptions[query.name][topic] > 0) {
return;
}
delete this._peerSubscriptions[query.serverId][topic];
var peer = this.peers[query.serverId];
delete this._peerSubscriptions[query.name][topic];
var peer = this.peers[query.name];
if (peer) {
peer.unsubscribe(topic);
peer.removeAllListeners(topic);
Expand Down
4 changes: 2 additions & 2 deletions lib/event_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ var EventEmitter = require('events').EventEmitter;
var EventSocket = module.exports = function(ws, query) {
EventEmitter.call(this);
this.ws = ws;
this.query = query; // contains .topic, .serverId
this.query = query; // contains .topic, .name

this.init();
};
util.inherits(EventSocket, EventEmitter);
Expand Down
Loading

0 comments on commit 2d89040

Please sign in to comment.