diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 94602022e721..aa98fee5a747 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -54,6 +54,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue */ protected $migrationBatchSize = -1; + /** + * Indicates if a secondary queue had a job available between checks of the primary queue. + * + * Only applicable when monitoring multiple named queues with a single instance. + * + * @var bool + */ + protected $secondaryQueueHadJob = false; + /** * Create a new Redis queue instance. * @@ -221,13 +230,23 @@ protected function createPayloadArray($job, $queue, $data = '') * @param string|null $queue * @return \Illuminate\Contracts\Queue\Job|null */ - public function pop($queue = null) + public function pop($queue = null, $index = 0) { $this->migrate($prefixed = $this->getQueue($queue)); - [$job, $reserved] = $this->retrieveNextJob($prefixed); + $block = ! $this->secondaryQueueHadJob && $index == 0; + + [$job, $reserved] = $this->retrieveNextJob($prefixed, $block); + + if ($index == 0) { + $this->secondaryQueueHadJob = false; + } if ($reserved) { + if ($index > 0) { + $this->secondaryQueueHadJob = true; + } + return new RedisJob( $this->container, $this, $job, $reserved, $this->connectionName, $queue ?: $this->default diff --git a/src/Illuminate/Queue/Worker.php b/src/Illuminate/Queue/Worker.php index 83b4e284d40c..762d487a1212 100644 --- a/src/Illuminate/Queue/Worker.php +++ b/src/Illuminate/Queue/Worker.php @@ -346,8 +346,8 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options) */ protected function getNextJob($connection, $queue) { - $popJobCallback = function ($queue) use ($connection) { - return $connection->pop($queue); + $popJobCallback = function ($queue, $index = 0) use ($connection) { + return $connection->pop($queue, $index); }; $this->raiseBeforeJobPopEvent($connection->getConnectionName()); @@ -360,8 +360,8 @@ protected function getNextJob($connection, $queue) ); } - foreach (explode(',', $queue) as $queue) { - if (! is_null($job = $popJobCallback($queue))) { + foreach (explode(',', $queue) as $index => $queue) { + if (! is_null($job = $popJobCallback($queue, $index))) { $this->raiseAfterJobPopEvent($connection->getConnectionName(), $job); return $job;