diff --git a/lib/queue/worker.js b/lib/queue/worker.js index d89a7111..4b5e2c9c 100644 --- a/lib/queue/worker.js +++ b/lib/queue/worker.js @@ -151,53 +151,63 @@ Worker.prototype.failed = function (job, err, fn) { Worker.prototype.process = function (job, fn) { var self = this , start = new Date(); - job.active( function(){ - fn( - job, - function (err, result) { - if (err) { - return self.failed(job, err, fn); - } - job.set('duration', job.duration = new Date - start); - if( result ) { - try{ - job.result = result; - job.set('result', JSON.stringify(result)); - }catch(e){ - job.set('result', JSON.stringify({error: true, message:'Invalid JSON Result: "' + result + '"' })); + var domain = require('domain').create(); + domain.on('error', function(err){ + self.failed(job, err, fn); + }); + + domain.run(function() { + job.active(function () { + fn( + job, + function (err, result) { + if (err) { + return self.failed(job, err, fn); } - } - job.complete( function(){ - job.attempt( function(){ - self.emit('job complete', job); - events.emit(job.id, 'complete', result); - if( job.removeOnComplete() ) { - job.remove(); + job.set('duration', job.duration = new Date - start); + if (result) { + try { + job.result = result; + job.set('result', JSON.stringify(result)); + } catch (e) { + job.set('result', JSON.stringify({ + error: true, + message: 'Invalid JSON Result: "' + result + '"' + })); + } + } + job.complete(function () { + job.attempt(function () { + self.emit('job complete', job); + events.emit(job.id, 'complete', result); + if (job.removeOnComplete()) { + job.remove(); + } + }); + }.bind(this)); + self.start(fn); + }, { + /** + * @author behrad + * @pause: let the processor to tell worker not to continue processing new jobs + */ + pause: function (fn, timeout) { + timeout = timeout || 5000; + self.queue.shutdown(fn, Number(timeout), self.type); + }, + /** + * @author behrad + * @pause: let the processor to trigger restart for they job processing + */ + resume: function () { + if (self.resume()) { + self.start(fn); } - }); - }.bind(this)); - self.start(fn); - },{ - /** - * @author behrad - * @pause: let the processor to tell worker not to continue processing new jobs - */ - pause: function( fn, timeout ){ - timeout = timeout || 5000; - self.queue.shutdown( fn, Number(timeout), self.type); - }, - /** - * @author behrad - * @pause: let the processor to trigger restart for they job processing - */ - resume: function () { - if (self.resume()) { - self.start(fn); } } - } - ); - }.bind(this)); + ); + }.bind(this)); + }); return this; }; diff --git a/test/test.js b/test/test.js index e7505118..d8b5aa36 100755 --- a/test/test.js +++ b/test/test.js @@ -32,6 +32,25 @@ describe('Jobs', function () { }); }); + it('should catch uncatched exception and mark job as failed', function(testDone) { + var catched = false; + jobs.create('failedJob', {}).on('complete', function() { + throw new Error('Job should be marked as failed and not complete'); + }).on('failed', function() { + catched.should.be.equal(true); + testDone(); + }).save(); + + jobs.process('failedJob', 1, function() { + try { + throw new Error('this should be catched') + } catch (err) { + catched = true; + } + throw new Error('toto'); + }); + }); + it('should retry on failure if attempts is set', function (testDone) { var job = jobs.create('failure-attempts', {}); var failures = 0;