diff --git a/src/Console/ClearCommand.php b/src/Console/ClearCommand.php new file mode 100644 index 00000000..7e16ff2e --- /dev/null +++ b/src/Console/ClearCommand.php @@ -0,0 +1,71 @@ +confirmToProceed()) { + return 1; + } + + if (! method_exists(RedisQueue::class, 'clear')) { + $this->line('Clearing queues is not supported on this version of Laravel'); + + return 1; + } + + $connection = Arr::first($this->laravel['config']->get('horizon.defaults'))['connection'] ?? 'redis'; + + $jobRepository->purge($queue = $this->getQueue($connection)); + + $count = $manager->connection($connection)->clear($queue); + + $this->line('Cleared '.$count.' jobs from the ['.$queue.'] queue '); + + return 0; + } + + /** + * Get the queue name to clear. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } +} diff --git a/src/HorizonServiceProvider.php b/src/HorizonServiceProvider.php index 17ed3991..691f780d 100644 --- a/src/HorizonServiceProvider.php +++ b/src/HorizonServiceProvider.php @@ -171,6 +171,7 @@ protected function registerCommands() { if ($this->app->runningInConsole()) { $this->commands([ + Console\ClearCommand::class, Console\ContinueCommand::class, Console\HorizonCommand::class, Console\InstallCommand::class, diff --git a/src/LuaScripts.php b/src/LuaScripts.php index 8258d6f3..ef260f03 100644 --- a/src/LuaScripts.php +++ b/src/LuaScripts.php @@ -32,6 +32,48 @@ public static function updateMetrics() end redis.call('hmset', KEYS[1], 'throughput', throughput, 'runtime', runtime) +LUA; + } + + /** + * Get the Lua script for purging recent and pending jobs off of the queue. + * + * KEYS[1] - The name of the recent jobs sorted set + * KEYS[2] - The name of the pending jobs sorted set + * ARGV[1] - The prefix of the Horizon keys + * ARGV[2] - The name of the queue to purge + * + * @return string + */ + public static function purge() + { + return <<<'LUA' + + local count = 0 + local cursor = 0 + + repeat + -- Iterate over the recent jobs sorted set + local scanner = redis.call('zscan', KEYS[1], cursor) + cursor = scanner[1] + + for i = 1, #scanner[2], 2 do + local jobid = scanner[2][i] + local hashkey = ARGV[1] .. jobid + local job = redis.call('hmget', hashkey, 'status', 'queue') + + -- Delete the pending/reserved jobs, that match the queue + -- name, from the sorted sets as well as the job hash + if((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then + redis.call('zrem', KEYS[1], jobid) + redis.call('zrem', KEYS[2], jobid) + redis.call('del', hashkey) + count = count + 1 + end + end + until cursor == '0' + + return count LUA; } } diff --git a/src/Repositories/RedisJobRepository.php b/src/Repositories/RedisJobRepository.php index f749798f..75cb06a7 100644 --- a/src/Repositories/RedisJobRepository.php +++ b/src/Repositories/RedisJobRepository.php @@ -8,6 +8,7 @@ use Illuminate\Support\Collection; use Laravel\Horizon\Contracts\JobRepository; use Laravel\Horizon\JobPayload; +use Laravel\Horizon\LuaScripts; class RedisJobRepository implements JobRepository { @@ -688,6 +689,24 @@ public function deleteFailed($id) $this->connection()->del($id); } + /** + * Delete pending and reserved jobs for a queue. + * + * @param string $queue + * @return int + */ + public function purge($queue) + { + return $this->connection()->eval( + LuaScripts::purge(), + 2, + 'recent_jobs', + 'pending_jobs', + config('horizon.prefix'), + $queue + ); + } + /** * Get the Redis connection instance. * diff --git a/tests/Feature/RedisJobRepositoryTest.php b/tests/Feature/RedisJobRepositoryTest.php index 6f240a9e..b8162866 100644 --- a/tests/Feature/RedisJobRepositoryTest.php +++ b/tests/Feature/RedisJobRepositoryTest.php @@ -56,4 +56,28 @@ public function test_it_saves_microseconds_as_a_float_and_disregards_the_locale( throw $e; } } + + public function test_it_removes_recent_jobs_when_queue_is_purged() + { + $repository = $this->app->make(JobRepository::class); + + $repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 1, 'displayName' => 'first']))); + $repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 2, 'displayName' => 'second']))); + $repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 3, 'displayName' => 'third']))); + $repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 4, 'displayName' => 'fourth']))); + $repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 5, 'displayName' => 'fifth']))); + + $repository->completed(new JobPayload(json_encode(['id' => 1, 'displayName' => 'first']))); + $repository->completed(new JobPayload(json_encode(['id' => 2, 'displayName' => 'second']))); + + $this->assertEquals(3, $repository->purge('email-processing')); + $this->assertEquals(2, $repository->countRecent()); + $this->assertEquals(0, $repository->countPending()); + $this->assertEquals(2, $repository->countCompleted()); + + $recent = collect($repository->getRecent()); + $this->assertNotNull($recent->firstWhere('id', 1)); + $this->assertNotNull($recent->firstWhere('id', 2)); + $this->assertCount(2, $repository->getJobs([1, 2, 3, 4, 5])); + } }