Skip to content

Commit

Permalink
fix: better handling of maxRetriesPerRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Feb 23, 2022
1 parent 848378d commit d3b9138
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 97 deletions.
2 changes: 1 addition & 1 deletion lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module.exports.Messages = {
RETRY_JOB_IS_LOCKED: "Couldn't retry job: The job is locked",
RETRY_JOB_NOT_FAILED:
"Couldn't retry job: The job has been already retried or has not failed",
MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest is not permitted.
MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest for bclient/subscriber is not permitted.
see https://github.com/OptimalBits/bull/issues/1873
`
};
40 changes: 25 additions & 15 deletions lib/queue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const redis = require('ioredis');
const Redis = require('ioredis');
const EventEmitter = require('events');

const _ = require('lodash');
Expand Down Expand Up @@ -120,9 +120,8 @@ const Queue = function Queue(name, url, opts) {
this.token = uuid.v4();

opts.redis = {
...opts.redis,
maxRetriesPerRequest: null,
enableReadyCheck: false
enableReadyCheck: false,
...opts.redis
};

_.defaults(opts.redis, {
Expand Down Expand Up @@ -272,7 +271,11 @@ function redisClientGetter(queue, options, initCallback) {
const createClient = _.isFunction(options.createClient)
? options.createClient
: function(type, config) {
return new redis(config);
if (['bclient', 'subscriber'].includes(type)) {
return new Redis({ ...config, maxRetriesPerRequest: null });
} else {
return new Redis(config);
}
};

const connections = {};
Expand All @@ -289,7 +292,10 @@ function redisClientGetter(queue, options, initCallback) {

const opts = client.options.redisOptions || client.options;

if (opts.enableReadyCheck || opts.maxRetriesPerRequest) {
if (
['bclient', 'subscriber'].includes(type) &&
(opts.enableReadyCheck || opts.maxRetriesPerRequest)
) {
throw new Error(errors.Messages.MISSING_REDIS_OPTS);
}

Expand Down Expand Up @@ -542,13 +548,18 @@ async function redisClientDisconnect(client) {
}
}),
500
).finally(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
});
)
.catch(() => {
// Ignore timeout error
})
.finally(() => {
client.once('error', _reject);

client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
Expand All @@ -572,8 +583,7 @@ Queue.prototype.close = function(doNotWaitJobs) {

return (this.closing = this.isReady()
.then(this._initializingProcess)
.catch(err => {
console.error(err);
.catch(() => {
isReady = false;
})
.then(() => isReady && this.pause(true, doNotWaitJobs))
Expand Down
115 changes: 48 additions & 67 deletions test/test_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
const expect = require('expect.js');
const utils = require('./utils');
const { isRedisReady } = require('../lib/utils');
const redis = require('ioredis');
const Redis = require('ioredis');
const Queue = require('../lib/queue');

describe('connection', () => {
let queue;
let client;

beforeEach(() => {
client = new redis();
return client.flushdb().then(() => {
queue = utils.buildQueue();
return queue;
});
client = new Redis();
return client.flushdb();
});

afterEach(() => {
Expand All @@ -24,55 +20,41 @@ describe('connection', () => {

it('should fail if reusing connections with invalid options', () => {
const errMsg = Queue.ErrorMessages.MISSING_REDIS_OPTS;
{
try {
const client = new redis();

const opts = {
createClient(type) {
switch (type) {
case 'client':
return client;
default:
return new redis();
}
}
};
utils.buildQueue('external connections', opts);
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}
}
{
const subscriber = new redis();

const opts = {
createClient(type) {
switch (type) {
case 'subscriber':
return subscriber;
default:
return new redis({
maxRetriesPerRequest: null,
enableReadyCheck: false
});
}
}
};

const testQueue = utils.buildQueue('external connections', opts);
const client = new Redis();

try {
testQueue.on('global:completed', () => {});
} catch (err) {
expect(err.message).to.be.equal(errMsg);
testQueue.close();
const opts = {
createClient(type) {
switch (type) {
case 'client':
return client;
default:
return new Redis();
}
}
};
const queue = utils.buildQueue('external connections', opts);
expect(queue).to.be.ok();

try {
// eslint-disable-next-line no-unused-vars
const _ = queue.bclient;
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}

try {
// eslint-disable-next-line no-unused-vars
const _ = queue.eclient;
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}
});

it('should recover from a connection loss', async () => {
const queue = utils.buildQueue();
queue.on('error', () => {
// error event has to be observed or the exception will bubble up
});
Expand Down Expand Up @@ -104,6 +86,8 @@ describe('connection', () => {

it('should handle jobs added before and after a redis disconnect', done => {
let count = 0;
const queue = utils.buildQueue();

queue
.process((job, jobDone) => {
if (count == 0) {
Expand Down Expand Up @@ -141,8 +125,8 @@ describe('connection', () => {
enableReadyCheck: false
};

const client = new redis(redisOpts);
const subscriber = new redis(redisOpts);
const client = new Redis(redisOpts);
const subscriber = new Redis(redisOpts);

const opts = {
createClient(type) {
Expand All @@ -152,7 +136,7 @@ describe('connection', () => {
case 'subscriber':
return subscriber;
default:
return new redis();
return new Redis();
}
}
};
Expand Down Expand Up @@ -184,31 +168,28 @@ describe('connection', () => {
});
});

it('should fail if redis connection fails and does not reconnect', done => {
queue = utils.buildQueue('connection fail', {
it('should fail if redis connection fails and does not reconnect', async () => {
const queue = utils.buildQueue('connection fail 123', {
redis: {
host: 'localhost',
port: 1234,
retryStrategy: () => false
}
});

isRedisReady(queue.client).then(
() => {
done(new Error('Did not fail connecting to invalid redis instance'));
},
err => {
expect(err.code).to.be.eql('ECONNREFUSED');
queue.close().then(done, done);
}
);
try {
await isRedisReady(queue.client);
new Error('Did not fail connecting to invalid redis instance');
} catch (err) {
expect(err.code).to.be.eql('ECONNREFUSED');
await queue.close();
}
});

it('should close cleanly if redis connection fails', async () => {
queue = utils.buildQueue('connection fail', {
const queue = new Queue('connection fail', {
redis: {
host: 'localhost',
port: 1234,
port: 1235,
retryStrategy: () => false
}
});
Expand All @@ -217,7 +198,7 @@ describe('connection', () => {
});

it('should accept ioredis options on the query string', async () => {
queue = new Queue(
const queue = new Queue(
'connection query string',
'redis://localhost?tls=RedisCloudFixed'
);
Expand Down
30 changes: 16 additions & 14 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ describe('Queue', () => {
return client;
case 'subscriber':
return subscriber;
case 'bclient':
return new redis({ ...opts, ...redisOpts });
default:
return new redis(opts);
}
Expand Down Expand Up @@ -615,15 +617,15 @@ describe('Queue', () => {
describe('.retryJobs', () => {
it('should retry all failed jobs', async () => {
const jobCount = 8;

let fail = true;
queue.process(async () => {
await delay(10);
if (fail) {
throw new Error('failed');
}
if (fail) {
throw new Error('failed');
}
});

let order = 0;
const failing = new Promise(resolve => {
queue.on('failed', job => {
Expand All @@ -634,16 +636,16 @@ describe('Queue', () => {
order++;
});
});

for (const index of Array.from(Array(jobCount).keys())) {
await queue.add({ idx: index });
}

await failing;

const failedCount = await queue.getJobCounts('failed');
expect(failedCount.failed).to.be.equal(jobCount);

order = 0;
const completing = new Promise(resolve => {
queue.on('completed', job => {
Expand All @@ -654,17 +656,17 @@ describe('Queue', () => {
order++;
});
});

fail = false;
await queue.retryJobs({ count: 2 });

await completing;

const CompletedCount = await queue.getJobCounts('completed');
expect(CompletedCount.completed).to.be.equal(jobCount);
});
});
});

it('should keep specified number of jobs after completed with removeOnComplete', async () => {
const keepJobs = 3;
await testRemoveOnFinish(keepJobs, keepJobs);
Expand Down

0 comments on commit d3b9138

Please sign in to comment.