diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index 966fe49071a8..d442eea99f11 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -47,7 +47,8 @@ public function connect(array $config) $config['connection'] ?? $this->connection, $config['retry_after'] ?? 60, $config['block_for'] ?? null, - $config['after_commit'] ?? null + $config['after_commit'] ?? null, + $config['migration_batch_size'] ?? -1 ); } } diff --git a/src/Illuminate/Queue/LuaScripts.php b/src/Illuminate/Queue/LuaScripts.php index fa278426bdbb..5452c116c198 100644 --- a/src/Illuminate/Queue/LuaScripts.php +++ b/src/Illuminate/Queue/LuaScripts.php @@ -106,7 +106,7 @@ public static function migrateExpiredJobs() { return <<<'LUA' -- Get all of the jobs with an expired "score"... -local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) +local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, ARGV[2]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 79efc0581f24..87e19f3ecdf8 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -45,6 +45,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue */ protected $blockFor = null; + /** + * The batch size to use when migrating delayed / expired jobs onto the primary queue. + * + * Negative values are infinite. + * + * @var int + */ + protected $migrationBatchSize = -1; + /** * Create a new Redis queue instance. * @@ -54,6 +63,7 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param int $retryAfter * @param int|null $blockFor * @param bool $dispatchAfterCommit + * @param int $migrationBatchSize * @return void */ public function __construct(Redis $redis, @@ -61,7 +71,8 @@ public function __construct(Redis $redis, $connection = null, $retryAfter = 60, $blockFor = null, - $dispatchAfterCommit = false) + $dispatchAfterCommit = false, + $migrationBatchSize = -1) { $this->redis = $redis; $this->default = $default; @@ -69,6 +80,7 @@ public function __construct(Redis $redis, $this->connection = $connection; $this->retryAfter = $retryAfter; $this->dispatchAfterCommit = $dispatchAfterCommit; + $this->migrationBatchSize = $migrationBatchSize; } /** @@ -244,7 +256,7 @@ protected function migrate($queue) public function migrateExpiredJobs($from, $to) { return $this->getConnection()->eval( - LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime() + LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime(), $this->migrationBatchSize ); }