Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.x] Add Horizon command to clear queue #892

Merged
merged 2 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/Console/ClearCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

namespace Laravel\Horizon\Console;

use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\Arr;
use Laravel\Horizon\RedisQueue;
use Laravel\Horizon\Repositories\RedisJobRepository;

class ClearCommand extends Command
{
use ConfirmableTrait;

/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'horizon:clear
{--queue= : The name of the queue to clear}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Delete all of the jobs from the specified queue';

/**
* Execute the console command.
*
* @return int|null
*/
public function handle(RedisJobRepository $jobRepository, QueueManager $manager)
{
if (! $this->confirmToProceed()) {
return 1;
}

if (! method_exists(RedisQueue::class, 'clear')) {
$this->line('<error>Clearing queues is not supported on this version of Laravel</error>');

return 1;
}

$connection = Arr::first($this->laravel['config']->get('horizon.defaults'))['connection'] ?? 'redis';

$jobRepository->purge($queue = $this->getQueue($connection));

$count = $manager->connection($connection)->clear($queue);
$this->line('<info>Cleared '.$count.' jobs from the ['.$queue.'] queue</info> ');

return 0;
}

/**
* Get the queue name to clear.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
}
1 change: 1 addition & 0 deletions src/HorizonServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected function registerCommands()
{
if ($this->app->runningInConsole()) {
$this->commands([
Console\ClearCommand::class,
Console\ContinueCommand::class,
Console\HorizonCommand::class,
Console\InstallCommand::class,
Expand Down
42 changes: 42 additions & 0 deletions src/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ public static function updateMetrics()
end

redis.call('hmset', KEYS[1], 'throughput', throughput, 'runtime', runtime)
LUA;
}

/**
* Get the Lua script for purging recent and pending jobs off of the queue.
*
* KEYS[1] - The name of the recent jobs sorted set
* KEYS[2] - The name of the pending jobs sorted set
* ARGV[1] - The prefix of the Horizon keys
* ARGV[2] - The name of the queue to purge
*
* @return string
*/
public static function purge()
{
return <<<'LUA'

local count = 0
local cursor = 0

repeat
-- Iterate over the recent jobs sorted set
local scanner = redis.call('zscan', KEYS[1], cursor)
cursor = scanner[1]

for i = 1, #scanner[2], 2 do
local jobid = scanner[2][i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')

-- Delete the pending/reserved jobs, that match the queue
-- name, from the sorted sets as well as the job hash
if((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
end
until cursor == '0'

return count
LUA;
}
}
14 changes: 14 additions & 0 deletions src/Repositories/RedisJobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Illuminate\Support\Collection;
use Laravel\Horizon\Contracts\JobRepository;
use Laravel\Horizon\JobPayload;
use Laravel\Horizon\LuaScripts;

class RedisJobRepository implements JobRepository
{
Expand Down Expand Up @@ -688,6 +689,19 @@ public function deleteFailed($id)
$this->connection()->del($id);
}

/**
* Delete pending and reserved jobs for a queue.
*
* @param string $queue
* @return int
*/
public function purge($queue)
{
return $this->connection()->eval(LuaScripts::purge(), 2,
'recent_jobs', 'pending_jobs', config('horizon.prefix'), $queue
);
}

/**
* Get the Redis connection instance.
*
Expand Down
24 changes: 24 additions & 0 deletions tests/Feature/RedisJobRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,28 @@ public function test_it_saves_microseconds_as_a_float_and_disregards_the_locale(
throw $e;
}
}

public function test_it_removes_recent_jobs_when_queue_is_purged()
{
$repository = $this->app->make(JobRepository::class);

$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 1, 'displayName' => 'first'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 2, 'displayName' => 'second'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 3, 'displayName' => 'third'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 4, 'displayName' => 'fourth'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 5, 'displayName' => 'fifth'])));

$repository->completed(new JobPayload(json_encode(['id' => 1, 'displayName' => 'first'])));
$repository->completed(new JobPayload(json_encode(['id' => 2, 'displayName' => 'second'])));

$this->assertEquals(3, $repository->purge('email-processing'));
$this->assertEquals(2, $repository->countRecent());
$this->assertEquals(0, $repository->countPending());
$this->assertEquals(2, $repository->countCompleted());

$recent = collect($repository->getRecent());
$this->assertNotNull($recent->firstWhere('id', 1));
$this->assertNotNull($recent->firstWhere('id', 2));
$this->assertCount(2, $repository->getJobs([1, 2, 3, 4, 5]));
}
}