Skip to content

Commit

Permalink
fix(worker): better closing when disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Feb 16, 2022
1 parent ccd4537 commit 41b9404
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
84 changes: 84 additions & 0 deletions lib/p-timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Extracted from p-timeout https://github.com/sindresorhus/p-timeout
// as it is not commonjs compatible. This is vesion 5.0.2
'use strict';

class TimeoutError extends Error {
constructor(message) {
super(message);
this.name = 'TimeoutError';
}
}

module.exports.TimeoutError = TimeoutError;

module.exports.pTimeout = function pTimeout(
promise,
milliseconds,
fallback,
options
) {
let timer;
const cancelablePromise = new Promise((resolve, reject) => {
if (typeof milliseconds !== 'number' || Math.sign(milliseconds) !== 1) {
throw new TypeError(
`Expected \`milliseconds\` to be a positive number, got \`${milliseconds}\``
);
}

if (milliseconds === Number.POSITIVE_INFINITY) {
resolve(promise);
return;
}

options = {
customTimers: { setTimeout, clearTimeout },
...options
};

timer = options.customTimers.setTimeout.call(
undefined,
() => {
if (typeof fallback === 'function') {
try {
resolve(fallback());
} catch (error) {
reject(error);
}

return;
}

const message =
typeof fallback === 'string'
? fallback
: `Promise timed out after ${milliseconds} milliseconds`;
const timeoutError =
fallback instanceof Error ? fallback : new TimeoutError(message);

if (typeof promise.cancel === 'function') {
promise.cancel();
}

reject(timeoutError);
},
milliseconds
);

(async () => {
try {
resolve(await promise);
} catch (error) {
reject(error);
} finally {
options.customTimers.clearTimeout.call(undefined, timer);
}
})();
});

cancelablePromise['clear'] = () => {
clearTimeout(timer);
timer = undefined;
};

return cancelablePromise;
};
5 changes: 2 additions & 3 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const utils = require('./utils');

const TimerManager = require('./timer-manager');
const { promisify } = require('util');
const pTimeout = require('p-timeout');
const { pTimeout } = require('./p-timeout');
const semver = require('semver');
const debuglog = require('debuglog')('bull');
const uuid = require('uuid');
Expand Down Expand Up @@ -538,7 +538,7 @@ async function redisClientDisconnect(client) {
}
}),
500
).catch(() => {
).finally(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
Expand Down Expand Up @@ -847,7 +847,6 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) {
this.bclient.connect()
);
}

return this.whenCurrentJobsFinished();
} else {
return scripts.pause(this, true);
Expand Down

0 comments on commit 41b9404

Please sign in to comment.