From 41b940457b3447619c3c2887674a8cebf1508b07 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 16 Feb 2022 18:06:43 +0800 Subject: [PATCH] fix(worker): better closing when disconnected --- lib/p-timeout.js | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/queue.js | 5 ++- 2 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 lib/p-timeout.js diff --git a/lib/p-timeout.js b/lib/p-timeout.js new file mode 100644 index 000000000..37af71177 --- /dev/null +++ b/lib/p-timeout.js @@ -0,0 +1,84 @@ +// Extracted from p-timeout https://github.com/sindresorhus/p-timeout +// as it is not commonjs compatible. This is vesion 5.0.2 +'use strict'; + +class TimeoutError extends Error { + constructor(message) { + super(message); + this.name = 'TimeoutError'; + } +} + +module.exports.TimeoutError = TimeoutError; + +module.exports.pTimeout = function pTimeout( + promise, + milliseconds, + fallback, + options +) { + let timer; + const cancelablePromise = new Promise((resolve, reject) => { + if (typeof milliseconds !== 'number' || Math.sign(milliseconds) !== 1) { + throw new TypeError( + `Expected \`milliseconds\` to be a positive number, got \`${milliseconds}\`` + ); + } + + if (milliseconds === Number.POSITIVE_INFINITY) { + resolve(promise); + return; + } + + options = { + customTimers: { setTimeout, clearTimeout }, + ...options + }; + + timer = options.customTimers.setTimeout.call( + undefined, + () => { + if (typeof fallback === 'function') { + try { + resolve(fallback()); + } catch (error) { + reject(error); + } + + return; + } + + const message = + typeof fallback === 'string' + ? fallback + : `Promise timed out after ${milliseconds} milliseconds`; + const timeoutError = + fallback instanceof Error ? fallback : new TimeoutError(message); + + if (typeof promise.cancel === 'function') { + promise.cancel(); + } + + reject(timeoutError); + }, + milliseconds + ); + + (async () => { + try { + resolve(await promise); + } catch (error) { + reject(error); + } finally { + options.customTimers.clearTimeout.call(undefined, timer); + } + })(); + }); + + cancelablePromise['clear'] = () => { + clearTimeout(timer); + timer = undefined; + }; + + return cancelablePromise; +}; diff --git a/lib/queue.js b/lib/queue.js index b60ec2315..f8266eed9 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -16,7 +16,7 @@ const utils = require('./utils'); const TimerManager = require('./timer-manager'); const { promisify } = require('util'); -const pTimeout = require('p-timeout'); +const { pTimeout } = require('./p-timeout'); const semver = require('semver'); const debuglog = require('debuglog')('bull'); const uuid = require('uuid'); @@ -538,7 +538,7 @@ async function redisClientDisconnect(client) { } }), 500 - ).catch(() => { + ).finally(() => { client.once('error', _reject); client.disconnect(); if (['connecting', 'reconnecting'].includes(client.status)) { @@ -847,7 +847,6 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) { this.bclient.connect() ); } - return this.whenCurrentJobsFinished(); } else { return scripts.pause(this, true);