From 883e712a2f8659601fb7921c6e3ad2f3cb48dc51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Ma=C5=82ecki?= Date: Fri, 9 Dec 2011 20:19:35 +0100 Subject: [PATCH] [api] First pass at Worker integration --- lib/forever.js | 131 +++++++++---------------------------------------- 1 file changed, 23 insertions(+), 108 deletions(-) diff --git a/lib/forever.js b/lib/forever.js index d803e54b..33f7c1d1 100644 --- a/lib/forever.js +++ b/lib/forever.js @@ -10,9 +10,7 @@ var fs = require('fs'), path = require('path'), events = require('events'), exec = require('child_process').exec, - net = require('net'), cliff = require('cliff'), - daemon = require('daemon'), nconf = require('nconf'), portfinder = require('portfinder'), timespan = require('timespan'), @@ -21,6 +19,7 @@ var fs = require('fs'), utile = require('utile'), mkdirp = utile.mkdirp, async = utile.async, + nssocket = require('nssocket'), winston = require('winston'); var forever = exports; @@ -44,6 +43,7 @@ forever.initialized = false; forever.root = path.join(process.env.HOME || '/root', '.forever'); forever.config = new nconf.File({ file: path.join(forever.root, 'config.json') }); forever.Forever = forever.Monitor = require('./forever/monitor').Monitor; +forever.Worker = require('./forever/worker').Worker; forever.cli = require('./forever/cli'); // @@ -90,65 +90,23 @@ function getSockets(sockPath, callback) { // Returns all data for processes managed by forever. // function getAllProcesses(callback) { - var sockPath = forever.config.get('sockPath'), - results = []; + var sockPath = forever.config.get('sockPath'); function getProcess(name, next) { var fullPath = path.join(sockPath, name), - socket = new net.Socket({ type: 'unix' }), - parsed = false, - data = ''; + socket = new nssocket.NsSocket(); - function tryParse() { - if (!parsed) { - parsed = true; - - var monitors; - try { - monitors = JSON.parse(data); - } - catch (ex) { - // - // Ignore errors - // - } - - // - // Be a little lazier about loading results - // - if (monitors && monitors.monitors) { - results = results.concat(monitors.monitors); - } - - next(); - } - } - - socket.on('error', function (err) { - if (err.code === 'ECONNREFUSED') { - try { - fs.unlinkSync(fullPath); - } - catch (ex) { } - return tryParse(); - } - else if (err.code === 'EACCES') { - forever.log.warn('Error contacting: ' + fullPath.magenta); - } - else { - forever.log.error('Unknown error (' + err.code + ') when contacting: ' + fullPath.magenta); + socket.connect(fullPath, function (err) { + if (err) { + return next(err); } - tryParse(); - }); - - socket.on('data', function (msg) { - data += msg; + socket.data(['data'], function (data) { + next(null, data); + socket.end(); + }); + socket.send(['data']); }); - - socket.on('close', tryParse); - - socket.connect(fullPath); } getSockets(sockPath, function (err, sockets) { @@ -156,8 +114,8 @@ function getAllProcesses(callback) { return callback(err); } - async.forEach(sockets, getProcess, function () { - callback(results); + async.map(sockets, getProcess, function (err, processes) { + callback(processes); }); }); } @@ -339,29 +297,11 @@ forever.startDaemon = function (script, options) { options = options || {}; options.uid = options.uid || utile.randomString(4).replace(/^\-/, '_'); options.logFile = forever.logFilePath(options.logFile || options.uid + '.log'); - options.pidFile = forever.pidFilePath(options.pidFile || options.uid + '.pid'); var monitor = new forever.Monitor(script, options); + monitor.start(); - fs.open(options.logFile, options.appendLog ? 'a+' : 'w+', function (err, fd) { - if (err) { - return monitor.emit('error', err); - } - - var pid = daemon.start(fd); - daemon.lock(options.pidFile); - - // - // Remark: This should work, but the fd gets screwed up - // with the daemon process. - // - // process.on('exit', function () { - // fs.unlinkSync(options.pidFile); - // }); - - process.pid = pid; - monitor.start(); - }); + var worker = new forever.Worker({ monitor: monitor }); return monitor; }; @@ -374,10 +314,8 @@ forever.startDaemon = function (script, options) { // forever.startServer = function () { var args = Array.prototype.slice.call(arguments), - socket = path.join(forever.config.get('sockPath'), 'forever.sock'), monitors = [], - callback, - server; + callback; args.forEach(function (a) { if (Array.isArray(a)) { @@ -393,36 +331,13 @@ forever.startServer = function () { } }); - server = net.createServer(function (socket) { - // - // Write the specified data and close the socket - // - socket.end(JSON.stringify({ - monitors: monitors.map(function (m) { - return m.data; - }) - })); - }); - - function onError(err) { - monitors.forEach(function (mon) { - mon.emit('error', err); - }); - } - - portfinder.getSocket({ path: socket }, function (err, socket) { - if (err) { - return onError(err); - } - - server.on('error', onError); - - server.listen(socket, function () { - if (callback) { - callback(null, server, socket); - } + async.forEach(monitors, function (monitor, next) { + var worker = new forever.Worker({ + monitor: monitor, + sockPath: forever.config.get('sockPath') }); - }); + worker.start(next); + }, callback || function () {}); };