Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Upgrade [email protected] and [email protected] #360

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8ebeab2
First at upgrading [email protected] and [email protected]
AdamMagaluk Nov 30, 2017
2fbfde9
Spdy/h2 ping working using forked version of `spdy`.
AdamMagaluk Dec 3, 2017
9a9e962
Add extra upgrade header data back to socket for spdy to parse
AdamMagaluk Feb 19, 2018
6639eed
Fix virtual device subscription causing socket hang up
AdamMagaluk Apr 18, 2018
70c95ac
Update Travis to remove 0.10, add 6,8,10
AdamMagaluk May 2, 2018
243df26
Changed custom spdy install to github:
AdamMagaluk May 2, 2018
ccf8321
Updates to fix streams closing the connection.
AdamMagaluk Nov 24, 2018
bdfe0b8
Fix isssue with reconnects not passing proper connectionId to peer cl…
AdamMagaluk Dec 4, 2018
0718688
Fix failing test where an embedded ws is expected to return 404.
AdamMagaluk Dec 8, 2018
04e4670
PeerSocket fix issue where a closing the peer conneciton causes subsc…
AdamMagaluk Dec 8, 2018
c1be114
Add TODO to skipped test that tests spdy errors on the agent
AdamMagaluk Dec 8, 2018
cd82209
Minor cleanup and comments
AdamMagaluk Dec 8, 2018
dbdb4b0
Upgrade dependancies for colors, uuid
AdamMagaluk Dec 8, 2018
4c9af90
Upgraded deps
AdamMagaluk Dec 8, 2018
792be3d
HTTP should return a JSON error message when a device does not exist.
AdamMagaluk Dec 9, 2018
cf7655c
Cleanup domain in test to ensure it does not catch unrelated errors.
AdamMagaluk Dec 9, 2018
179ec30
Add --exit to mocha to be compatible with old style in 1.x
AdamMagaluk Dec 9, 2018
a30d4d9
Handle spdy Connection errors in the PeerSocket
AdamMagaluk Dec 9, 2018
a85cd7b
Drop travis support for 0.12
AdamMagaluk Dec 11, 2018
457e809
Merge branch 'master' into spdy-upgrade
AdamMagaluk Dec 11, 2018
df6b0ad
Remove use of ES6 default parameter.
AdamMagaluk Dec 11, 2018
8840b62
Drop support for node v4 in .travis.
AdamMagaluk Dec 11, 2018
c7afa12
Merge branch 'spdy-upgrade' of github.com:zettajs/zetta into spdy-upg…
AdamMagaluk Dec 11, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
language: node_js
node_js:
- "0.12"
- "4"
- "6"
- "8"
- "10"
Expand Down
21 changes: 17 additions & 4 deletions lib/api_resources/servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ var streams = require('zetta-streams');
var ObjectStream = streams.ObjectStream;
var ActionError = require('zetta-device').ActionError;

// Returns the JSON error for when a device does not exist.
const DeviceDoesNotExistError = function(env, deviceNotFound) {
return {
class: ['error'],
properties: {
message: `Device ${deviceNotFound} does not exist.`
},
links: [
{ rel: ['self'], href: env.helpers.url.current() }
]
};
};

var ServerResource = module.exports = function(server) {
this.server = server;
this.httpScout = this.server.httpScout;
Expand Down Expand Up @@ -316,7 +329,7 @@ ServerResource.prototype.destroyDevice = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down Expand Up @@ -358,7 +371,7 @@ ServerResource.prototype.showDevice = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand All @@ -383,7 +396,7 @@ ServerResource.prototype.updateDevice = function(env, next) {
var device = this.server.runtime._jsDevices[env.route.params.deviceId];

if (!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down Expand Up @@ -444,7 +457,7 @@ ServerResource.prototype.deviceAction = function(env, next) {

var device = this.server.runtime._jsDevices[env.route.params.deviceId];
if(!device) {
env.response.body = 'Device does not exist';
env.response.body = DeviceDoesNotExistError(env, env.route.params.deviceId);
env.response.statusCode = 404;
return next(env);
}
Expand Down
84 changes: 52 additions & 32 deletions lib/http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

// external http(s) server
var httpOptions = {
windowSize: 1024 * 1024
connection: {
windowSize: 1024 * 1024,
autoSpdy31: false
},
spdy: {
plain: true,
ssl: false
}
};

var tlsCheckOptions = ['cert', 'key', 'pfx', 'ca'];
var usingSSL = false;
Object.keys(options).forEach(function(k) {
Expand All @@ -66,16 +74,29 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {
});

// If any tls options were specified, use ssl and not plain
httpOptions.plain = (usingSSL) ? false : true;
httpOptions.ssl = (usingSSL) ? true : false;
this.server = spdy.createServer(httpOptions);
httpOptions.spdy.plain = (usingSSL) ? false : true;
httpOptions.spdy.ssl = (usingSSL) ? true : false;

var spdyServerOpts = {
connection: {
windowSize: 1024 * 1024,
autoSpdy31: false
},
spdy: {
plain: true,
ssl: false
}
};

// Outside http server
this.server = spdy.createServer(httpOptions);

// internal server for z2z, allways ssl: false, plain: true
this.spdyServer = spdy.createServer({
windowSize: 1024 * 1024,
plain: true,
ssl: false
});
// TODO: remove this as it is unneeded now.
this.spdyServer = spdy.createServer(spdyServerOpts);
this.spdyServer.on('ping', function(socket) {
socket.emit('spdyPing');
})

var ValidWSUrls = [
/^\/events$/, // /events
Expand All @@ -93,7 +114,6 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

this.wss = new WebSocketServer({ noServer: true });
this.server.on('upgrade', function(request, socket, headers) {

var sendError = function(code) {
// Check any custom websocket paths from extentions
var finish = function() {
Expand Down Expand Up @@ -125,7 +145,7 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {

// Handle Peer Request
self.wss.handleUpgrade(request, socket, headers, function(ws) {
self.setupPeerSocket(ws);
self.setupPeerSocket(ws, request);
});
});
} else if (match(request)) {
Expand All @@ -137,15 +157,15 @@ var ZettaHttpServer = module.exports = function(zettaInstance, options) {
}

self.wss.handleUpgrade(request, socket, headers, function(ws) {
if (ws.upgradeReq.url === '/peer-management') {
if (request.url === '/peer-management') {
var query = [
{ name: self.zetta.id, topic: '_peer/connect' },
{ name: self.zetta.id, topic: '_peer/disconnect' }];

var client = new EventSocket(ws, query);
self.eventBroker.client(client);
} else {
self.setupEventSocket(ws);
self.setupEventSocket(ws, request);
}
});
});
Expand Down Expand Up @@ -255,15 +275,15 @@ function getCurrentProtocol(req) {
return protocol;
}

ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, request, host, p) {
ws._env = { helpers: {}};
ws._loader = { path: p };

ws._env.uri = function() {
var protocol = getCurrentProtocol(ws.upgradeReq);
var protocol = getCurrentProtocol(request);

if (!host) {
var address = ws.upgradeReq.connection.address();
var address = request.connection.address();
host = address.address;
if (address.port) {
if (!(protocol === 'https' && address.port === 443) &&
Expand All @@ -272,7 +292,7 @@ ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
}
}
}
return (protocol + '://' + path.join(host, ws.upgradeReq.url)).replace(/\\/g, '/');
return (protocol + '://' + path.join(host, request.url)).replace(/\\/g, '/');
};

ws._env.helpers.url = {};
Expand All @@ -284,24 +304,24 @@ ZettaHttpServer.prototype.wireUpWebSocketForEvent = function(ws, host, p) {
};
};

ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
ZettaHttpServer.prototype.setupPeerSocket = function(ws, request) {
var self = this;
var name = /^\/peers\/(.+)$/.exec(url.parse(ws.upgradeReq.url, true).pathname)[1];
var name = /^\/peers\/(.+)$/.exec(url.parse(request.url, true).pathname)[1];
name = decodeURI(name);
self.zetta.log.emit('log', 'http_server', 'Websocket connection for peer "' + name + '" established.');

// Include ._env and ._loader on websocket, allows argo formatters to work used in virtual_device build actions.
var host = ws.upgradeReq.headers['host']
self.wireUpWebSocketForEvent(ws, host, '/servers/' + name);
var host = request.headers['host']
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + name);

if (self.peers[name] && self.peers[name].state !== PeerSocket.DISCONNECTED) {
// peer already connected or connecting
ws.close(4000, 'peer already connected');
} else if (self.peers[name]) {
// peer has been disconnected but has connected before.
self.peers[name].init(ws);
self.peers[name].init(ws, request);
} else {
var peer = new PeerSocket(ws, name, self.peerRegistry, self.peerOptions);
var peer = new PeerSocket(ws, request, name, self.peerRegistry, self.peerOptions);
self.peers[name] = peer;

// Events coming from the peers pubsub using push streams
Expand All @@ -327,13 +347,13 @@ ZettaHttpServer.prototype.setupPeerSocket = function(ws) {
}
};

ZettaHttpServer.prototype.setupEventSocket = function(ws) {
ZettaHttpServer.prototype.setupEventSocket = function(ws, request) {
var self = this;
var host = ws.upgradeReq.headers['host'];
var host = request.headers['host'];

if (/^\/events/.exec(ws.upgradeReq.url)) {
self.wireUpWebSocketForEvent(ws, host, '/servers/' + self.zetta._name);
var parsed = url.parse(ws.upgradeReq.url, true);
if (/^\/events/.exec(request.url)) {
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + self.zetta._name);
var parsed = url.parse(request.url, true);
var query = parsed.query;

if(!query.topic) {
Expand Down Expand Up @@ -386,18 +406,18 @@ ZettaHttpServer.prototype.setupEventSocket = function(ws) {

self.zetta.pubsub.subscribe('_peer/connect', subscribeOnPeerConnect);
} else {
var match = /^\/servers\/(.+)\/events/.exec(ws.upgradeReq.url);
var match = /^\/servers\/(.+)\/events/.exec(request.url);
if(!match) {
ws.close(1001); // go away status code
return;
}

var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
var query = querystring.parse(url.parse(request.url).query);
query.serverId = match[1]; // set serverId on query

self.wireUpWebSocketForEvent(ws, host, '/servers/' + query.serverId);
self.wireUpWebSocketForEvent(ws, request, host, '/servers/' + query.serverId);

var query = querystring.parse(url.parse(ws.upgradeReq.url).query);
var query = querystring.parse(url.parse(request.url).query);
query.name = decodeURI(match[1]);

if (query.topic) {
Expand Down
11 changes: 3 additions & 8 deletions lib/peer_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ var spdy = require('spdy');
var Logger = require('./logger');
var WebSocket = require('./web_socket');

// monkey patch spdy connection to get access to ping event
var originalPingHandler = spdy.Connection.prototype._handlePing;
spdy.Connection.prototype._handlePing = function() {
this.socket.emit('spdyPing', this);
originalPingHandler.apply(this, arguments);
};

function calculatePeerUrl(url, name){
var wsUrl = url.replace(/^http/, 'ws');
var peerPath = '/peers/' + name;
Expand Down Expand Up @@ -130,10 +123,12 @@ PeerClient.prototype._createSocket = function() {
self.checkServerReq();
self.emit('connecting');
self.server.emit('connection', socket);
socket.on('spdyPing', function(connection) {

socket.on('spdyPing', function() {
// reset ping timer on a spdy ping from the peer
self._resetPingTimeout();
});

self.log.emit('log', 'peer-client', 'WebSocket to peer established (' + self.url + ')');
});

Expand Down
Loading