diff --git a/lib/api_formats/siren/server.siren.js b/lib/api_formats/siren/server.siren.js index 6e62bdc..542db93 100644 --- a/lib/api_formats/siren/server.siren.js +++ b/lib/api_formats/siren/server.siren.js @@ -10,7 +10,6 @@ module.exports = function(context) { var entity = { class: ['server'], properties: { - id: server.id, name: server._name }, actions: [ diff --git a/lib/api_resources/devices.js b/lib/api_resources/devices.js index 68b5f05..de8f20d 100644 --- a/lib/api_resources/devices.js +++ b/lib/api_resources/devices.js @@ -16,7 +16,7 @@ DevicesResource.prototype.init = function(config) { DevicesResource.prototype.list = function(env, next) { var serverId = env.request.headers['zetta-forwarded-server'] || this.server.id; - var localServer = { path: '/servers/'+ serverId }; + var localServer = { path: '/servers/'+ encodeURI(serverId) }; var context = { devices: this.server.runtime._jsDevices, loader: localServer, env: env }; env.format.render('devices', context); next(env); diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 43c67b3..bb9ccfb 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -138,16 +138,15 @@ PeerManagementBuilder.prototype.build = function() { PeerManagementBuilder.prototype.entities = function() { var self = this; - if (this.data && this.data.length) { this.base.entities = this.data.map(function(peer) { - var peerUrl = peer.url || self.urlHelper.path('/servers/' + peer.id); + var peerUrl = peer.url || self.urlHelper.path('/servers/' + encodeURI(peer.id)); var entity = { class: ['peer'], properties: { id: peer.id, - name: peer.name, + name: peer.id, direction: peer.direction, status: peer.status, error: peer.error, @@ -165,9 +164,8 @@ PeerManagementBuilder.prototype.entities = function() { } var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server; - entity.links = [ - { rel: [rels.self], href: self.urlHelper.join(peer.id) }, + { rel: [rels.self], href: self.urlHelper.join(encodeURI(peer.id)) }, { rel: [peerUrlRel], href: peerUrl }, { rel: [rels.monitor], href: peerUrl.replace(/^http/, 'ws') } ]; @@ -214,7 +212,7 @@ PeerItemBuilder.prototype.build = function() { PeerItemBuilder.prototype.properties = function() { this.base.properties = { id: this.data.id, - name: this.data.name, + name: this.data.id, direction: this.data.direction, status: this.data.status, error: this.data.error, @@ -239,7 +237,7 @@ PeerItemBuilder.prototype.actions = function() { PeerItemBuilder.prototype.links = function() { var self = this; - var peerUrl = this.data.url || self.urlHelper.path('/servers/' + this.data.id); + var peerUrl = this.data.url || self.urlHelper.path('/servers/' + encodeURI(this.data.id)); var peerUrlRel = self.base.properties.direction === 'initiator' ? rels.root : rels.server; this.base.links = [ diff --git a/lib/api_resources/root.js b/lib/api_resources/root.js index c19b5a1..065e7fd 100644 --- a/lib/api_resources/root.js +++ b/lib/api_resources/root.js @@ -24,7 +24,7 @@ RootResource.prototype.list = function(env, next) { { title: this.server._name, rel: [rels.server], - href: env.helpers.url.path('/servers/'+this.server.id) + href: env.helpers.url.path('/servers/' + encodeURI(this.server.id) ) } ] }; @@ -39,9 +39,9 @@ RootResource.prototype.list = function(env, next) { if (results) { results.forEach(function(peer) { env.response.body.links.push({ - title: peer.name, + title: peer.id, rel: [rels.peer], - href: env.helpers.url.path('/servers/' + peer.id) + href: env.helpers.url.path('/servers/' + encodeURI(peer.id)) }); }); } diff --git a/lib/api_resources/servers.js b/lib/api_resources/servers.js index 3f15b0f..3d37cb5 100644 --- a/lib/api_resources/servers.js +++ b/lib/api_resources/servers.js @@ -9,7 +9,7 @@ var ServerResource = module.exports = function(server) { ServerResource.prototype.getServer = function(env) { var serverId = env.request.headers['zetta-forwarded-server'] || this.server.id; - return { path: '/servers/' + serverId }; + return { path: '/servers/' + encodeURI(serverId) }; }; ServerResource.prototype.init = function(config) { diff --git a/lib/event_broker.js b/lib/event_broker.js index 3c652d4..227fa27 100644 --- a/lib/event_broker.js +++ b/lib/event_broker.js @@ -13,10 +13,10 @@ var EventBroker = module.exports = function(zetta) { EventBroker.prototype.peer = function(peer) { var self = this; - this.peers[peer.serverId] = peer; - this._peerSubscriptions[peer.serverId] = this._peerSubscriptions[peer.serverId] || {}; + this.peers[peer.name] = peer; + this._peerSubscriptions[peer.name] = this._peerSubscriptions[peer.name] || {}; // subscribe to topics for interest - Object.keys(this._peerSubscriptions[peer.serverId]).forEach(function(topic) { + Object.keys(this._peerSubscriptions[peer.name]).forEach(function(topic) { peer.removeAllListeners(topic); peer.on(topic, function(data) { self._publish(topic, data); @@ -47,7 +47,7 @@ EventBroker.prototype._subscribe = function(query) { var topic = query.topic; // is local - if (query.serverId === this.zetta.id) { + if (query.name === this.zetta.id) { if (!this.subscriptions[topic]) { this.subscriptions[topic] = { count: 0, listener: null }; } @@ -61,13 +61,13 @@ EventBroker.prototype._subscribe = function(query) { this.subscriptions[topic].count++; } else { - if (!this._peerSubscriptions[query.serverId]) { - this._peerSubscriptions[query.serverId] = { topic: 0}; + if (!this._peerSubscriptions[query.name]) { + this._peerSubscriptions[query.name] = { topic: 0}; } - if (!this._peerSubscriptions[query.serverId][topic]) { - this._peerSubscriptions[query.serverId][topic] = 0; - var peer = this.peers[query.serverId]; + if (!this._peerSubscriptions[query.name][topic]) { + this._peerSubscriptions[query.name][topic] = 0; + var peer = this.peers[query.name]; if (peer) { peer.removeAllListeners(topic); peer.on(topic, function(data) { @@ -76,14 +76,14 @@ EventBroker.prototype._subscribe = function(query) { peer.subscribe(topic); } } - this._peerSubscriptions[query.serverId][topic]++; + this._peerSubscriptions[query.name][topic]++; } }; EventBroker.prototype._unsubscribe = function(query) { var topic = query.topic; - if (query.serverId === this.zetta.id) { + if (query.name === this.zetta.id) { this.subscriptions[topic].count--; if (this.subscriptions[topic].count > 0) { return; @@ -92,16 +92,16 @@ EventBroker.prototype._unsubscribe = function(query) { this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener); delete this.subscriptions[topic]; } else { - if (!this._peerSubscriptions[query.serverId]) { - this._peerSubscriptions[query.serverId] = { topic: 1}; + if (!this._peerSubscriptions[query.name]) { + this._peerSubscriptions[query.name] = { topic: 1}; } - this._peerSubscriptions[query.serverId][topic]--; - if (this._peerSubscriptions[query.serverId][topic] > 0) { + this._peerSubscriptions[query.name][topic]--; + if (this._peerSubscriptions[query.name][topic] > 0) { return; } - delete this._peerSubscriptions[query.serverId][topic]; - var peer = this.peers[query.serverId]; + delete this._peerSubscriptions[query.name][topic]; + var peer = this.peers[query.name]; if (peer) { peer.unsubscribe(topic); peer.removeAllListeners(topic); diff --git a/lib/event_socket.js b/lib/event_socket.js index 3bf7fc9..cdd96eb 100644 --- a/lib/event_socket.js +++ b/lib/event_socket.js @@ -3,8 +3,8 @@ var EventEmitter = require('events').EventEmitter; var EventSocket = module.exports = function(ws, query) { EventEmitter.call(this); this.ws = ws; - this.query = query; // contains .topic, .serverId - + this.query = query; // contains .topic, .name + this.init(); }; util.inherits(EventSocket, EventEmitter); diff --git a/lib/http_server.js b/lib/http_server.js index 16835a1..3528260 100644 --- a/lib/http_server.js +++ b/lib/http_server.js @@ -25,10 +25,9 @@ var ZettaHttpServer = module.exports = function(zettaInstance) { this.peerRegistry = zettaInstance.peerRegistry; this.eventBroker = new EventBroker(zettaInstance); this.clients = {}; - this.agent = null; - this.peers = []; - this.agents = {}; - this.router = {}; + this.peers = {}; // connected peers + this._disconnectedPeers = {}; + this._collectors = {}; this.server = http.createServer(); this.spdyServer = spdy.createServer({ @@ -94,148 +93,44 @@ ZettaHttpServer.prototype.init = function(cb) { this.wss.on('connection', function(ws) { var match = /^\/peers\/(.+)$/.exec(ws.upgradeReq.url); if (match) { - - var u = url.parse(ws.upgradeReq.url, true); // parse out connectionId - match = /^\/peers\/(.+)$/.exec(u.pathname); - var peerId = match[1]; - var connectionId = u.query.connectionId; - - self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer ' + peerId + ' established'); - - var peer = new PeerSocket(ws, peerId); - - function initiatePeer(tries) { - if (tries === undefined) { - tries = 0; - } - function retry() { - if (tries < 2) { - initiatePeer(++tries); - } else { - ws.close(4000, 'Failed to initiate peer connection'); - } - } - - var requestOpts = { method: 'GET', path: '/', agent: peer.agent }; - var peerRequest = http.request(requestOpts, function(res) { - var buffer = []; - var len = 0; - - res.on('readable', function() { - var data; - while (data = res.read()) { - buffer.push(data); - len += data.length; - } - }); - - res.on('end', function() { - clearTimeout(timer); - - self.zetta.log.emit('log', 'http_server', 'Peer http request finished ' + peerId); - var buf = Buffer.concat(buffer, len); - var root = JSON.parse(buf.toString()); - - var peerName = root.links.filter(function(link) { - return link.rel.indexOf(rels.server) !== -1; - })[0].title; - - var peerItem = { - peerId: peerId, - direction: 'acceptor', - name: peerName, - status: 'connecting' - }; - - self.peerRegistry.add(peerItem, function(err, newPeer) { - if (err) { - console.error('HttpServer: Failed to save peeer information', err); - ws.close(); - return; - } - - peer.serverId = newPeer.id; // set proxy peer id - self.agents[newPeer.id] = peer.agent; - self.agents[peerId] = peer.agent; - self.router[newPeer.id] = peerId; - self.peers.push(peer); - self.eventBroker.peer(peer); - - - function setPeerStatus(status) { - self.peerRegistry.get(newPeer.id, function(err, peer) { - peer = JSON.parse(peer); - if (peer) { - peer.status = status; - self.peerRegistry.save(peer); - } - }); - } - - if (connectionId) { - // confirm connection with peer - peer.confirmConnection(connectionId, function(err) { - if (err) { - console.error('Failed to confirm connection, closing connection for peer ' + peerId); - ws.close(); - } - self.zetta.log.emit('log', 'http_server', 'Peer connection established ' + peerId + ' mapped to ' + newPeer.id); - setPeerStatus('connected'); - - // peer-event - self.zetta.pubsub.publish('_peer/connect', { peer: peer, peerId: peerId, id: newPeer.id }); - }); - } else { - self.zetta.log.emit('log', 'http_server', 'Peer connection established ' + peerId + ' mapped to ' + newPeer.id); - setPeerStatus('connected'); - - // peer-event - self.zetta.pubsub.publish('_peer/connect', { peer: peer, peerId: peerId, id: newPeer.id }); - } - - ws.on('close', function() { - self.zetta.log.emit('log', 'http_server', 'Peer connection closed for ' + peerId + ' mapped to ' + newPeer.id); - self._cleanupPeer(peer); // remove from zetta internals - setPeerStatus('disconnected'); - - // peer-event - self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, peerId: peerId, id: newPeer.id }); - }); - - ws.on('error', function(err) { - self.zetta.log.emit('log', 'http_server', 'Peer connection failed for ' + peerId + ' mapped to ' + newPeer.id); - self._cleanupPeer(peer); // remove from zetta internals - - // peer-event - self.zetta.pubsub.publish('_peer/disconnect', { peer: peer, peerId: peerId, id: newPeer.id }); - - self.peerRegistry.get(newPeer.id, function(err, peer) { - peer = JSON.parse(peer); - if (peer) { - peer.status = 'failed'; - peer.error = err; - self.peerRegistry.save(peer); - } - }); - }); - }); - }); - }); - - peerRequest.on('error', function(err) { - clearTimeout(timer); // stop timeout - retry(); // retry initiation, until limit is reached + var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1]; + name = decodeURI(name); + self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.'); + + var peer = null; + if (self.peers[name]) { + // peer already connected? + ws.close(4000, 'peer already connected'); + } else if (self._disconnectedPeers[name]) { + // peer has been disconnected but has connected before. + peer = self._disconnectedPeers[name]; + delete self._disconnectedPeers[name]; + peer.init(ws); + } else { + peer = new PeerSocket(ws, name, self.peerRegistry); + + peer.on('connected', function() { + self.peers[name] = peer; + self.eventBroker.peer(peer); + + self.zetta.log.emit('log', 'http_server', 'Peer connection established "' + name + '".'); + self.zetta.pubsub.publish('_peer/connect', { peer: peer }); }); - var timer = setTimeout(function() { - self.zetta.log.emit('log', 'http_server', 'Websocket http request timed out for peer ' + peerId); - peerRequest.abort(); // abort current req - }, 5000); + peer.on('error', function(err) { + self.zetta.log.emit('log', 'http_server', 'Peer connection failed for "' + name + '".'); + self._disconnectedPeers[name] = self.peers[name]; + delete self.peers[name]; + self.zetta.pubsub.publish('_peer/disconnect', { peer: peer }); + }); - peerRequest.end(); + peer.on('end', function() { + self._disconnectedPeers[name] = self.peers[name]; + delete self.peers[name]; + self.zetta.log.emit('log', 'http_server', 'Peer connection closed for "' + name + '".'); + self.zetta.pubsub.publish('_peer/disconnect', { peer: peer }); + }); } - - setImmediate(initiatePeer); } else { self.setupEventSocket(ws); } @@ -272,24 +167,11 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) { return; } var query = querystring.parse(url.parse(ws.upgradeReq.url).query); - query.serverId = match[1]; // set serverId on query + query.name = decodeURI(match[1]); var client = new EventSocket(ws, query); this.eventBroker.client(client); }; -ZettaHttpServer.prototype._cleanupPeer = function(peerSocket) { - var peerId = this.router[peerSocket.serverId]; - delete this.agents[peerSocket.serverId]; - delete this.agents[peerId]; - delete this.router[peerSocket.serverId]; - var idx = this.peers.indexOf(peerSocket); - if (idx !== -1) { - this.peers.splice(idx, 1); - } - - // this.eventBroker.peer(peer); -}; - ZettaHttpServer.prototype.httpRegistration = function(handle) { handle('request', function(env, next) { if (!(env.request.method === 'POST' && env.request.url === '/registration')) { @@ -298,15 +180,15 @@ ZettaHttpServer.prototype.httpRegistration = function(handle) { env.request.getBody(function(err, body) { body = JSON.parse(body.toString()); - var agent = self.agents[body.target]; + var peer = self.peers[body.target]; - if (!agent) { + if (!peer.agent) { env.response.statusCode = 404; return next(env); } env.request.body = new Buffer(JSON.stringify(body.device)); - env.zettaAgent = agent; + env.zettaAgent = peer.agent; next(env); }); }); @@ -314,63 +196,58 @@ ZettaHttpServer.prototype.httpRegistration = function(handle) { //ZettaHttpServer.prototype.proxyToPeer = function(handle) { ZettaHttpServer.prototype.proxyToPeer = function(env, next) { - //handle('request', function(env, next) { var self = this; - var req = env.request; - var res = env.response; - if (!self.peers.length) { - res.statusCode = 404; - res.end(); - return; - } - var messageId = ++self.idCounter; + var req = env.request; + var res = env.response; - // change this to handle multiple fogs - self.clients[messageId] = res;//req.socket; Will need socket for event broadcast. + var messageId = ++self.idCounter; - var appName = req.url.split('/')[2]; + // change this to handle multiple fogs + self.clients[messageId] = res;//req.socket; Will need socket for event broadcast. - req.headers['zetta-message-id'] = messageId; - req.headers['zetta-forwarded-server'] = appName; + var name = decodeURI(req.url.split('/')[2]); - var agent = env.zettaAgent || self.agents[appName]; - if (!agent){ - res.statusCode = 404; - res.end(); - return; - } + req.headers['zetta-message-id'] = messageId; + req.headers['zetta-forwarded-server'] = name; - var path = req.url.replace(appName, self.router[appName]); - var opts = { method: req.method, headers: req.headers, path: path, agent: agent }; - var request = http.request(opts, function(response) { - var id = response.headers['zetta-message-id']; - var res = self.clients[id]; + var peer = self.peers[name]; + if (!peer){ + res.statusCode = 404; + res.end(); + return; + } - if (!res) { - response.statusCode = 404; - return; - } + var agent = env.zettaAgent || peer.agent; - Object.keys(response.headers).forEach(function(header) { - if (header !== 'zetta-message-id') { - res.setHeader(header, response.headers[header]); - } - }); + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts, function(response) { + var id = response.headers['zetta-message-id']; + var res = self.clients[id]; - response.pipe(res); + if (!res) { + response.statusCode = 404; + return; + } - response.on('finish', function() { - delete self.clients[id]; - next(env); - }); + Object.keys(response.headers).forEach(function(header) { + if (header !== 'zetta-message-id') { + res.setHeader(header, response.headers[header]); + } + }); + + response.pipe(res); + response.on('finish', function() { + delete self.clients[id]; + next(env); }); - if (req.body) { - request.end(req.body); - } else { - req.pipe(request); - } - //}); + }); + + if (req.body) { + request.end(req.body); + } else { + req.pipe(request); + } }; diff --git a/lib/peer_client.js b/lib/peer_client.js index 5d0c3e6..d347ff5 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -15,7 +15,7 @@ function backoffTime() { var PeerClient = module.exports = function(url, server) { var wsUrl = url.replace(/^http/, 'ws'); - var peerPath = '/peers/' + server.id; + var peerPath = '/peers/' + server._name; if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) { wsUrl = wsUrl + peerPath; } else { diff --git a/lib/peer_registry.js b/lib/peer_registry.js index 022eb29..049d1e6 100644 --- a/lib/peer_registry.js +++ b/lib/peer_registry.js @@ -33,9 +33,10 @@ PeerRegistry.prototype.add = function(peer, cb) { var peerQuery = { match: function(item) { - if (peer.name && item.name === peer.name) { + if (peer.id && item.id === peer.id) { return true; } + if (peer.url && peer.url === item.url) { return true; } @@ -49,10 +50,9 @@ PeerRegistry.prototype.add = function(peer, cb) { } var result = (results && results.length) ? results[0] : null; if (result) { - result.peerId = peer.id; result.status = peer.status; } - + peer = result || peer; if (!peer.id) { @@ -61,7 +61,7 @@ PeerRegistry.prototype.add = function(peer, cb) { peer.status = peer.status || 'connecting'; peer.direction = peer.direction || 'initiator'; - + self.save(peer, function(err) { if (err && cb) { return cb(err); diff --git a/lib/peer_socket.js b/lib/peer_socket.js index ce978e1..3a9b5b1 100644 --- a/lib/peer_socket.js +++ b/lib/peer_socket.js @@ -5,29 +5,165 @@ var url = require('url'); var querystring = require('querystring'); var spdy = require('spdy'); var SpdyAgent = require('./spdy_agent'); +var rels = require('./api_rels'); -var PeerSocket = module.exports = function(ws, appName) { +var PeerSocket = module.exports = function(ws, name, peerRegistry) { EventEmitter.call(this); - this.ws = ws; - this.appName = appName; // peers local id - this.serverId = null; // set after peer registry initializes it from http_server + this.name = name; // peers local id this.agent = null; this.subscriptions = []; + this.connectionId = null; this._pingTimer = null; - - this.init(); + this._pingTimeout = 10 * 1000; + this.peerRegistry = peerRegistry; + + this.init(ws); }; util.inherits(PeerSocket, EventEmitter); -PeerSocket.prototype.onEnd = function() { +PeerSocket.prototype.close = function() { + clearInterval(this._pingTimer); + this.ws.close(); +}; + +PeerSocket.prototype.init = function(ws) { + var self = this; + + if (ws) { + this._initWs(ws); + } + + // delay because ws/spdy may not be fully established + setImmediate(function() { + // setup connection + self._setupConnection(function(err) { + if (err) { + self.close(); + } + + + var subscriptions = self.subscriptions; + self.subscriptions = []; // clear it before resubscribing + // subscribe to all prev subscriptions + subscriptions.forEach(function(event) { + self.subscribe(event); + }); + + self._startPingTimer(); + + self.emit('connected'); + }); + }); +}; + +PeerSocket.prototype._setupConnection = function(cb, tries) { + var self = this; + + var peerItem = { + direction: 'acceptor', + id: self.name, + status: 'connecting' + }; + + self.peerRegistry.add(peerItem, function(err, newPeer) { + if (err) { + return cb(err); + } + + // confirm connection with peer + self.confirmConnection(self.connectionId, function(err) { + if (err) { + return cb(new Error('Failed to confirm connection.')); + } + self._setPeerStatus('connected', cb); + }); + + }); +}; + +PeerSocket.prototype._initWs = function(ws) { + var self = this; + var u = url.parse(ws.upgradeReq.url, true); // parse out connectionId + this.ws = ws; + this.connectionId = u.query.connectionId; + + this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler. + + this.ws._socket.on('end', function() { + clearInterval(self._pingTimer); + self._setPeerStatus('disconnected'); + self.emit('end'); + }); + + this.ws.on('error', function(err) { + clearInterval(self._pingTimer); + self._setPeerStatus('failed', err); + self.emit('error', err); + }); + + this.agent = spdy.createAgent(SpdyAgent, { + host: this.name, + port: 80, + socket: this.ws._socket, + spdy: { + plain: true, + ssl: false + } + }); + + // TODO: Remove this when bug in agent socket removal is fixed. + this.agent.maxSockets = 150; + this.agent.on('push', this.onPushData.bind(this)); +}; + +PeerSocket.prototype._startPingTimer = function() { + var self = this; clearInterval(this._pingTimer); + this._pingTimer = setInterval(function() { + var timeout = setTimeout(function() { + console.error('PeerSocket PING timeout:', self.name); + self.close(); + }, self._pingTimeout) + + self.agent.ping(function(err) { + if (timeout) { + clearTimeout(timeout); + } + }); + }, self._pingTimeout); + +}; + +PeerSocket.prototype._setPeerStatus = function(status, err, cb) { var self = this; - setTimeout(function() { - if (!self.ws._socket) { - self.emit('end'); + + if (typeof err === 'function') { + cb = err; + err = undefined; + } + + + if (!cb) { + cb = function(){}; + } + + this.peerRegistry.get(this.name, function(err, peer) { + if (err) { + return cb(err); + } + + try { + peer = JSON.parse(peer); + } catch(err) { + return cb(err); } - }, 5 * 60 * 1000); + peer.status = status; + if (err) { + peer.error = err; + } + self.peerRegistry.save(peer, cb); + }); }; PeerSocket.prototype.onPushData = function(stream) { @@ -67,48 +203,6 @@ PeerSocket.prototype.onPushData = function(stream) { }); }; -PeerSocket.prototype.init = function() { - var self = this; - - this.ws._socket.removeAllListeners('data'); // Remove WebSocket data handler. - this.ws._socket.on('end', this.onEnd.bind(this)); - - this.agent = spdy.createAgent(SpdyAgent, { - host: this.appName, - port: 80, - socket: this.ws._socket, - spdy: { - plain: true, - ssl: false - } - }); - - // TODO: Remove this when bug in agent socket removal is fixed. - this.agent.maxSockets = 150; - this.agent.on('push', this.onPushData.bind(this)); - // subscribe to all prev subscriptions - this.subscriptions.forEach(this.subscribe.bind(this)); - - if(this._pingTimer) { - clearInterval(this._pingTimer); - } - - var self = this; - this._pingTimer = setInterval(function() { - var timeout = setTimeout(function() { - console.error('PeerSocket PING timeout:', self.appName); - clearInterval(self._pingTimer); // stop future pings - self.ws.close(); - }, 10 * 1000) - - self.agent.ping(function(err) { - if (timeout) { - clearTimeout(timeout); - } - }); - }, 10 * 1000); -}; - PeerSocket.prototype.subscribe = function(event, cb) { if(!cb) { cb = function() {}; @@ -128,7 +222,7 @@ PeerSocket.prototype.subscribe = function(event, cb) { 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'fog.argo.cx', 'Content-Length': body.length, - 'zetta-forwarded-server': this.serverId + 'zetta-forwarded-server': this.name }, path: '/_pubsub/subscribe', agent: this.agent @@ -157,7 +251,7 @@ PeerSocket.prototype.unsubscribe = function(event, cb) { 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'fog.argo.cx', 'Content-Length': body.length, - 'zetta-forwarded-server': this.serverId + 'zetta-forwarded-server': this.name }, path: '/_pubsub/unsubscribe', agent: this.agent @@ -183,7 +277,7 @@ PeerSocket.prototype.confirmConnection = function(connectionId, callback) { PeerSocket.prototype.transition = function(action, args, cb) { var u = url.parse(action.href); - var path = u.pathname.replace(this.serverId, this.appName); + var path = u.pathname; var body = new Buffer(querystring.stringify(args)); var opts = { @@ -191,7 +285,7 @@ PeerSocket.prototype.transition = function(action, args, cb) { path: path, method: action.method, headers: { - 'zetta-forwarded-server': this.serverId, + 'zetta-forwarded-server': this.name, 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'fog.argo.cx', 'Content-Length': body.length, diff --git a/sample/IHeardDat/index.js b/sample/IHeardDat/index.js index b96c62d..69cc61b 100644 --- a/sample/IHeardDat/index.js +++ b/sample/IHeardDat/index.js @@ -7,6 +7,7 @@ zetta() .expose('*') .use(Arduino) .load(IHeardThat) + .link('http://localhost:3002') .listen(3000, function(err) { if(err) { console.log(err); diff --git a/test/fixture/zetta_test.js b/test/fixture/zetta_test.js index c33dc31..98d03ea 100644 --- a/test/fixture/zetta_test.js +++ b/test/fixture/zetta_test.js @@ -40,13 +40,7 @@ ZettaTest.prototype.server = function(name, scouts, peers) { } server.locatePeer = function(id) { - for (var k in server.httpServer.router) { - if (server.httpServer.router[k] === id) { - return k; - } - } - - return null; + return id; }; server._testPeers = peers || []; @@ -87,8 +81,6 @@ ZettaTest.prototype.run = function(callback) { return err; } - - function check(done) { var allQuery = { match: function() { return true; } @@ -100,16 +92,11 @@ ZettaTest.prototype.run = function(callback) { ret = false; return; } + var pServer = self._serversUrl[peer.url]; - var router = pServer.httpServer.router; - var foundInRouter = false; - for (var k in router){ - if (router[k] === self.servers[name].id) { - foundInRouter = true; - break; - } + if (!pServer.httpServer.peers[name]) { + ret = false; } - ret = foundInRouter; }); done(ret); }); @@ -130,7 +117,6 @@ ZettaTest.prototype.run = function(callback) { next(); } ); - }); }, callback); diff --git a/test/test_api.js b/test/test_api.js index 088d88a..554fe52 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -34,7 +34,6 @@ function checkDeviceOnRootUri(entity) { assert.deepEqual(entity.class, ['device']); assert.deepEqual(entity.rel, ["http://rels.zettajs.io/device"]); - assert(entity.properties.id); assert(entity.properties.name); assert(entity.properties.type); assert(entity.properties.state); @@ -90,7 +89,7 @@ describe('Zetta Api', function() { .expose('*') ._run(done); - url = '/servers/'+app.id; + url = '/servers/'+app._name; }); it('should have content type application/vnd.siren+json', function(done) { @@ -118,7 +117,6 @@ describe('Zetta Api', function() { request(getHttpServer(app)) .get(url) .expect(getBody(function(res, body) { - assert.equal(body.properties.id, app.id); assert.equal(body.properties.name, 'local'); })) .end(done); @@ -410,7 +408,7 @@ describe('Zetta Api', function() { .expose('*') ._run(function() { device = app.runtime._jsDevices[Object.keys(app.runtime._jsDevices)[0]]; - url = '/servers/' + app.id + '/devices/' + device.id; + url = '/servers/' + app._name + '/devices/' + device.id; done(); }); }); diff --git a/test/test_event_broker.js b/test/test_event_broker.js index 8ec5022..68ab6c6 100644 --- a/test/test_event_broker.js +++ b/test/test_event_broker.js @@ -12,6 +12,7 @@ var Registry = require('./fixture/scout_test_mocks').MockRegistry; var Ws = function() { EventEmitter.call(this) this._socket = new net.Socket(); + this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'}; }; util.inherits(Ws, EventEmitter); Ws.prototype.send = function(data, options, cb) { @@ -23,18 +24,19 @@ describe('EventBroker', function() { var query = null; var app = null; var broker = null; + var peerRegistry = null; beforeEach(function() { var reg = new Registry(); - var peerRegistry = new PeerRegistry(); + peerRegistry = new PeerRegistry(); app = zetta({ registry: reg, peerRegistry: peerRegistry }); - query = { topic: 'some-topic', serverId: app.id }; + query = { topic: 'some-topic', name: app.id }; broker = new EventBroker(app); }); it('it should add peer by server name', function() { var ws = new Ws(); - var peer = new PeerSocket(ws, 'some-peer'); - peer.serverId = 'some-peer2'; + var peer = new PeerSocket(ws, 'some-peer', peerRegistry); + peer.name = 'some-peer2'; broker.peer(peer); assert.equal(peer, broker.peers['some-peer2']); }); diff --git a/test/test_event_ws_proxied.js b/test/test_event_ws_proxied.js index 59fa8dc..66fffb8 100644 --- a/test/test_event_ws_proxied.js +++ b/test/test_event_ws_proxied.js @@ -10,13 +10,13 @@ describe('Event Websocket Proxied Through Peer', function() { var device = null; beforeEach(function(done) { cluster = zettatest() - .server('cloud') - .server('detroit1', [Scout], ['cloud']) + .server('cloud deploy') + .server('detroit 1', [Scout], ['cloud deploy']) .run(function(err){ - var id = cluster.servers['detroit1'].id; - base = 'localhost:' + cluster.servers['cloud']._testPort + '/servers/' + cluster.servers['cloud'].locatePeer(id); - var did = Object.keys(cluster.servers['detroit1'].runtime._jsDevices)[0]; - device = cluster.servers['detroit1'].runtime._jsDevices[did]; + var id = cluster.servers['detroit 1'].id; + base = 'localhost:' + cluster.servers['cloud deploy']._testPort + '/servers/' + cluster.servers['cloud deploy'].locatePeer(id); + var did = Object.keys(cluster.servers['detroit 1'].runtime._jsDevices)[0]; + device = cluster.servers['detroit 1'].runtime._jsDevices[did]; done(err); }) }); @@ -37,7 +37,6 @@ describe('Event Websocket Proxied Through Peer', function() { it('websocket should connect', function(done) { var url = 'ws://' + base + '/events?topic=testdriver/'+device.id+'/bar'; - console.log(url) var error = 0; var open = false; var socket = new WebSocket(url); diff --git a/test/test_peer_client.js b/test/test_peer_client.js index 2fb6387..b9977ea 100644 --- a/test/test_peer_client.js +++ b/test/test_peer_client.js @@ -2,7 +2,7 @@ var PeerClient = require('../lib/peer_client'); var assert = require('assert'); -var MockServer = {id: '1234', httpServer: { spdyServer: {}}}; +var MockServer = { _name: '1234', httpServer: { spdyServer: {}}, log: {}}; var urlEndingWithSlash = 'http://cloud.zettajs.io/'; var urlEndingWithNoSlash = 'http://cloud.zettajs.io'; diff --git a/test/test_peer_registry.js b/test/test_peer_registry.js index 7b1aa6e..a4ddff1 100644 --- a/test/test_peer_registry.js +++ b/test/test_peer_registry.js @@ -83,7 +83,7 @@ describe('Peer Registry', function() { describe('#add', function() { it('should save new peers', function(done) { var reg = new PeerRegistry(db); - var peer = {}; + var peer = {id: 'someid'}; reg.add(peer, function(err, result) { assert.ok(result); @@ -93,7 +93,7 @@ describe('Peer Registry', function() { it('should generate an ID for new peers', function(done) { var reg = new PeerRegistry(db); - var peer = {}; + var peer = {id: 'someid'}; reg.add(peer, function(err, result) { assert.ok(result.id); @@ -115,7 +115,7 @@ describe('Peer Registry', function() { it('propagates errors from #find', function(done) { var reg = new PeerRegistry(db); - var peer = {}; + var peer = {id: 'someid'}; reg.find = function(key, cb) { cb(new Error()); @@ -144,8 +144,8 @@ describe('Peer Registry', function() { it('it should not match entries when both .url are undefined or null', function(done) { var reg = new PeerRegistry(db); - var peer1 = { name: 'some-peer-1'}; - var peer2 = { name: 'some-peer-2'}; + var peer1 = { id: 'some-peer-1'}; + var peer2 = { id: 'some-peer-2'}; reg.add(peer1, function(err, result1) { assert.ok(result1.id); diff --git a/test/test_virtual_device.js b/test/test_virtual_device.js index 2b21d00..f075f2b 100644 --- a/test/test_virtual_device.js +++ b/test/test_virtual_device.js @@ -29,9 +29,14 @@ describe('Virtual Device', function() { .server('detroit1', [Scout], ['cloud']) .run(function(err){ if (err) { - return cb(err); + return done(err); } - socket = cluster.servers['cloud'].httpServer.peers[0]; + + socket = cluster.servers['cloud'].httpServer.peers['detroit1']; + if (!socket) { + done(new Error('socket not found')); + } + var did = Object.keys(cluster.servers['detroit1'].runtime._jsDevices)[0]; device = cluster.servers['detroit1'].runtime._jsDevices[did]; var id = cluster.servers['detroit1'].id; @@ -100,6 +105,43 @@ describe('Virtual Device', function() { }); }); + it('call should work with arguments, after peer reconnects', function(done) { + vdevice.call('test', 'hello', function(err) { + assert.equal(err, null); + }); + var timer = setTimeout(function() { + done(new Error('Faied to recv transition call on detroit device')); + }, 100); + + var recv = 0; + device.on('test', function() { + recv++; + + if (recv === 1) { + clearTimeout(timer); + assert.equal(device.value, 'hello'); + + var socket = cluster.servers['cloud'].httpServer.peers['detroit1']; + socket.close(); + + setTimeout(function() { + vdevice.call('test', 'hello1', function(err) { + assert.equal(err, null); + }); + var timer = setTimeout(function() { + done(new Error('Faied to recv transition call on detroit device')); + }, 100); + + device.on('test', function() { + clearTimeout(timer); + assert.equal(device.value, 'hello1'); + done(); + }); + }, 100); + } + }); + }); + }); describe('Device log monitor stream', function() { diff --git a/zetta.js b/zetta.js index 644a7be..cd71db6 100644 --- a/zetta.js +++ b/zetta.js @@ -21,8 +21,8 @@ var Zetta = module.exports = function(opts) { opts = opts || {}; - this.id = uuid.v4(); // unique id of server this._name = os.hostname(); // optional name, defaults to OS hostname + this.id = this._name; this._exposeQuery = ''; this._scouts = []; @@ -50,6 +50,7 @@ var Zetta = module.exports = function(opts) { Zetta.prototype.name = function(name) { this._name = name; + this.id = this._name; return this; }; @@ -308,7 +309,7 @@ Zetta.prototype._initPeers = function(callback) { self.peerRegistry.save(peer); // peer-event - self.pubsub.publish('_peer/connect', { peer: peerClient, id: peer.id }); + self.pubsub.publish('_peer/connect', { peer: peerClient}); }); peerClient.on('error', function(error) { @@ -319,7 +320,7 @@ Zetta.prototype._initPeers = function(callback) { self.peerRegistry.save(result); // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient, id: peer.id }); + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); }); }); @@ -329,7 +330,7 @@ Zetta.prototype._initPeers = function(callback) { result.status = 'disconnected'; // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient, id: peer.id }); + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); self.peerRegistry.save(result, function() { // re-connect