Skip to content

Commit

Permalink
Added optional PeerSocket options to set timeouts (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamMagaluk authored Oct 5, 2016
1 parent 6669c66 commit d3b3727
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
3 changes: 2 additions & 1 deletion lib/http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {
this.eventBroker = new EventBroker(zettaInstance);
this.clients = {};
this.peers = {}; // connected peers
this.peerOptions = {}; // default empty options for PeerSocket

this._deviceQueries = [];

Expand Down Expand Up @@ -301,7 +302,7 @@ ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
// peer has been disconnected but has connected before.
self.peers[name].init(ws);
} else {
var peer = new PeerSocket(ws, name, self.peerRegistry);
var peer = new PeerSocket(ws, name, self.peerRegistry, self.peerOptions);
self.peers[name] = peer;

// Events coming from the peers pubsub using push streams
Expand Down
11 changes: 8 additions & 3 deletions lib/peer_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ var STATES = {
'CONNECTED': 2
};

var PeerSocket = module.exports = function(ws, name, peerRegistry) {
var PeerSocket = module.exports = function(ws, name, peerRegistry, opts) {
EventEmitter.call(this);

if (!opts) {
opts = {};
}

var self = this;
this.state = STATES.DISCONNECTED;
this.name = name; // peers local id
this.agent = null;
this.subscriptions = {}; // { <topic>: <subscribed_count> }
this.connectionId = null;
this._pingTimer = null;
this._pingTimeout = 10 * 1000;
this._pingTimeout = Number(opts.pingTimeout) || (10 * 1000);
this._confirmationTimeout = Number(opts.confirmationTimeout) || 10 * 1000;
this.peerRegistry = peerRegistry;
this.logger = new Logger();

Expand Down Expand Up @@ -360,7 +365,7 @@ PeerSocket.prototype.confirmConnection = function(connectionId, callback) {
var timeout = setTimeout(function() {
req.abort();
callback(new Error('Confirm connection timeout reached.'));
}, this._pingTimeout);
}, this._confirmationTimeout);

var opts = { agent: this.agent, path: '/_initiate_peer/' + connectionId };
var req = http.get(opts, function(res) {
Expand Down
23 changes: 23 additions & 0 deletions test/test_zetta.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,29 @@ describe('Zetta', function() {
});
});

it('peerOptions in httpServer should update options in PeerSockets', function(done) {
var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
z.silent();
z.use(function(server) {
server.httpServer.peerOptions = {
pingTimeout: 4321,
confirmationTimeout: 1234
};
server.pubsub.subscribe('_peer/connect', function(topic, data) {
assert.equal(data.peer._pingTimeout, 4321);
assert.equal(data.peer._confirmationTimeout, 1234);
done();
})
})
z.listen(0, function() {
var port = z.httpServer.server.address().port;
zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
.silent()
.link('http://localhost:' + port)
.listen(0);
})
})

it('.link should not add to peers', function(done){

peerRegistry.db.put('1234567', JSON.stringify({id: '1234567', direction: 'initiator', url: 'http://example.com/', fromLink: true}), function(err){
Expand Down

0 comments on commit d3b3727

Please sign in to comment.