Skip to content

Commit

Permalink
Merge branch 'clearqueue_command' into 5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Sep 22, 2020
2 parents 70b48b5 + 05e3a1c commit db5adb1
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 0 deletions.
71 changes: 71 additions & 0 deletions src/Console/ClearCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

namespace Laravel\Horizon\Console;

use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\Arr;
use Laravel\Horizon\RedisQueue;
use Laravel\Horizon\Repositories\RedisJobRepository;

class ClearCommand extends Command
{
use ConfirmableTrait;

/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'horizon:clear
{--queue= : The name of the queue to clear}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Delete all of the jobs from the specified queue';

/**
* Execute the console command.
*
* @return int|null
*/
public function handle(RedisJobRepository $jobRepository, QueueManager $manager)
{
if (! $this->confirmToProceed()) {
return 1;
}

if (! method_exists(RedisQueue::class, 'clear')) {
$this->line('<error>Clearing queues is not supported on this version of Laravel</error>');

return 1;
}

$connection = Arr::first($this->laravel['config']->get('horizon.defaults'))['connection'] ?? 'redis';

$jobRepository->purge($queue = $this->getQueue($connection));

$count = $manager->connection($connection)->clear($queue);

$this->line('<info>Cleared '.$count.' jobs from the ['.$queue.'] queue</info> ');

return 0;
}

/**
* Get the queue name to clear.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
}
1 change: 1 addition & 0 deletions src/HorizonServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected function registerCommands()
{
if ($this->app->runningInConsole()) {
$this->commands([
Console\ClearCommand::class,
Console\ContinueCommand::class,
Console\HorizonCommand::class,
Console\InstallCommand::class,
Expand Down
42 changes: 42 additions & 0 deletions src/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ public static function updateMetrics()
end
redis.call('hmset', KEYS[1], 'throughput', throughput, 'runtime', runtime)
LUA;
}

/**
* Get the Lua script for purging recent and pending jobs off of the queue.
*
* KEYS[1] - The name of the recent jobs sorted set
* KEYS[2] - The name of the pending jobs sorted set
* ARGV[1] - The prefix of the Horizon keys
* ARGV[2] - The name of the queue to purge
*
* @return string
*/
public static function purge()
{
return <<<'LUA'
local count = 0
local cursor = 0
repeat
-- Iterate over the recent jobs sorted set
local scanner = redis.call('zscan', KEYS[1], cursor)
cursor = scanner[1]
for i = 1, #scanner[2], 2 do
local jobid = scanner[2][i]
local hashkey = ARGV[1] .. jobid
local job = redis.call('hmget', hashkey, 'status', 'queue')
-- Delete the pending/reserved jobs, that match the queue
-- name, from the sorted sets as well as the job hash
if((job[1] == 'reserved' or job[1] == 'pending') and job[2] == ARGV[2]) then
redis.call('zrem', KEYS[1], jobid)
redis.call('zrem', KEYS[2], jobid)
redis.call('del', hashkey)
count = count + 1
end
end
until cursor == '0'
return count
LUA;
}
}
19 changes: 19 additions & 0 deletions src/Repositories/RedisJobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Illuminate\Support\Collection;
use Laravel\Horizon\Contracts\JobRepository;
use Laravel\Horizon\JobPayload;
use Laravel\Horizon\LuaScripts;

class RedisJobRepository implements JobRepository
{
Expand Down Expand Up @@ -688,6 +689,24 @@ public function deleteFailed($id)
$this->connection()->del($id);
}

/**
* Delete pending and reserved jobs for a queue.
*
* @param string $queue
* @return int
*/
public function purge($queue)
{
return $this->connection()->eval(
LuaScripts::purge(),
2,
'recent_jobs',
'pending_jobs',
config('horizon.prefix'),
$queue
);
}

/**
* Get the Redis connection instance.
*
Expand Down
24 changes: 24 additions & 0 deletions tests/Feature/RedisJobRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,28 @@ public function test_it_saves_microseconds_as_a_float_and_disregards_the_locale(
throw $e;
}
}

public function test_it_removes_recent_jobs_when_queue_is_purged()
{
$repository = $this->app->make(JobRepository::class);

$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 1, 'displayName' => 'first'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 2, 'displayName' => 'second'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 3, 'displayName' => 'third'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 4, 'displayName' => 'fourth'])));
$repository->pushed('horizon', 'email-processing', new JobPayload(json_encode(['id' => 5, 'displayName' => 'fifth'])));

$repository->completed(new JobPayload(json_encode(['id' => 1, 'displayName' => 'first'])));
$repository->completed(new JobPayload(json_encode(['id' => 2, 'displayName' => 'second'])));

$this->assertEquals(3, $repository->purge('email-processing'));
$this->assertEquals(2, $repository->countRecent());
$this->assertEquals(0, $repository->countPending());
$this->assertEquals(2, $repository->countCompleted());

$recent = collect($repository->getRecent());
$this->assertNotNull($recent->firstWhere('id', 1));
$this->assertNotNull($recent->firstWhere('id', 2));
$this->assertCount(2, $repository->getJobs([1, 2, 3, 4, 5]));
}
}

0 comments on commit db5adb1

Please sign in to comment.