Skip to content

Commit

Permalink
fair-queue improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
hamed-aloware committed Jun 14, 2022
1 parent c0feb15 commit 65b64d9
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 9 deletions.
2 changes: 1 addition & 1 deletion public/app.js

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions resources/js/screens/dashboard.vue
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,39 @@
</div>
</div>
</div>
<div class="card-bg-secondary">
<div class="d-flex">
<div class="w-50 border-right border-bottom">
<div class="p-4">
<small class="text-uppercase">Processed Jobs Past Minute</small>

<h4 class="mt-4 mb-0">
{{ stats.processedJobsInPastMinute ? stats.processedJobsInPastMinute.toLocaleString() : 0 }}
</h4>
</div>
</div>

<div class="w-50 border-right border-bottom">
<div class="p-4">
<small class="text-uppercase">Processed Jobs Past 20 Minutes</small>

<h4 class="mt-4 mb-0">
{{ stats.processedJobsInPast20Minutes ? stats.processedJobsInPast20Minutes.toLocaleString() : 0 }}
</h4>
</div>
</div>

<div class="w-50 border-right border-bottom">
<div class="p-4">
<small class="text-uppercase">Processed Jobs Past Hour</small>

<h4 class="mt-4 mb-0">
{{ stats.processedJobsInPastHour ? stats.processedJobsInPastHour.toLocaleString() : 0 }}
</h4>
</div>
</div>
</div>
</div>
</div>

<div class="card mt-4" v-if="workload.length">
Expand Down
73 changes: 73 additions & 0 deletions src/Commands/RefreshStats.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Aloware\FairQueue\Commands;

use Aloware\FairQueue\Facades\FairQueue;
use Aloware\FairQueue\Interfaces\RepositoryInterface;
use Aloware\FairQueue\Repositories\RedisKeys;
use Illuminate\Console\Command;

class RefreshStats extends Command
{
use RedisKeys;
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'fair-queue:refresh-stats';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Refresh Jobs Stats';

/**
* @var RepositoryInterface
*/
private $repository;

/**
* Create a new command instance.
*
* @return void
*/
public function __construct(RepositoryInterface $repository)
{
parent::__construct();

$this->repository = $repository;
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$redis = FairQueue::getConnection();

$this->refreshKeys($redis, $redis->keys($this->recentProcesedJobsPattern(1)), 1);
$this->refreshKeys($redis, $redis->keys($this->recentProcesedJobsPattern(20)), 20);
$this->refreshKeys($redis, $redis->keys($this->recentProcesedJobsPattern(60)), 60);

$this->info('Fair-Queue Stats Refreshed Successfully');
}

/**
* Refresh Redis Keys
*
* @return mixed
*/
public function refreshKeys($redis, $keys, $minute = 1)
{
$timestamp = now()->subMinutes($minute)->getPreciseTimestamp(3);
foreach ($keys as $key)
{
$redis->zremrangebyscore($key, '-inf', "({$timestamp}");
}
}
}
33 changes: 33 additions & 0 deletions src/FairQueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
use Aloware\FairQueue\Commands\PurgeFailedJobs;
use Aloware\FairQueue\Commands\RecoverLostJobs;
use Aloware\FairQueue\Commands\RetryFailedJobs;
use Aloware\FairQueue\Commands\RefreshStats;
use Aloware\FairQueue\Facades\FairQueue;
use Aloware\FairQueue\Repositories\RedisRepository;
use Aloware\FairQueue\Interfaces\RepositoryInterface;
use Aloware\FairQueue\Repositories\RedisKeys;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Support\ServiceProvider;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobProcessed;

class FairQueueServiceProvider extends ServiceProvider
{
use RedisKeys;
/**
* Register the service provider.
*
Expand Down Expand Up @@ -43,17 +48,21 @@ public function boot(): void
GenerateSignal::class,
RetryFailedJobs::class,
PurgeFailedJobs::class,
RefreshStats::class,
]);

$this->registerRoutes();
$this->registerResources();
$this->publishAssets();
$this->registerQueueEvents();

$this->app->booted(function () {
/** @var Schedule $schedule */
$schedule = $this->app->make(Schedule::class);

$schedule->command(RecoverLostJobs::class)->hourly();
$schedule->command(RecoverLostJobs::class)->hourly();
$schedule->command(RefreshStats::class)->everyMinute();
});
}

Expand All @@ -73,6 +82,30 @@ protected function registerRoutes(): void
});
}

/**
* Register the FairQueue Queue Events.
*
* @return void
*/
protected function registerQueueEvents(): void
{
Queue::after(function (JobProcessed $event) {
$redis = FairQueue::getConnection();
$payload = $event->job->payload();
$command = unserialize($payload['data']['command']);
if(!$command instanceof FairSignalJob)
return;
$queue = $command->queue;
$partition = $command->partition;
$past_minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 1);
$past_20minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 20);
$past_60minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 60);
$redis->zadd($past_minute_key, now()->getPreciseTimestamp(3), $payload['id']);
$redis->zadd($past_20minute_key, now()->getPreciseTimestamp(3), $payload['id']);
$redis->zadd($past_60minute_key, now()->getPreciseTimestamp(3), $payload['id']);
});
}

/**
* Register the FairQueue resources.
*
Expand Down
7 changes: 5 additions & 2 deletions src/Http/Controllers/DashboardStatsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ public function index()
$totalFailedJobs = FairQueue::totalFailedJobsCount($failedQueues);

return response()->json([
'totalJobs' => $totalJobs,
'totalFailedJobs' => $totalFailedJobs,
'totalJobs' => $totalJobs,
'totalFailedJobs' => $totalFailedJobs,
'totalQueues' => count($queues),
'processedJobsInPastMinute' => FairQueue::processedJobsInPastMinutes($queues, 1),
'processedJobsInPast20Minutes' => FairQueue::processedJobsInPastMinutes($queues, 20),
'processedJobsInPastHour' => FairQueue::processedJobsInPastMinutes($queues, 60),
]);
}

Expand Down
4 changes: 4 additions & 0 deletions src/Interfaces/RepositoryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public function job($queue, $partition, $index);

public function totalJobsCount($queues);

public function processedJobsInPastMinutes($queues, $minutes);

public function failedQueues();

public function failedQueuesWithPartitions();
Expand All @@ -55,6 +57,8 @@ public function purgeFailedJobs();

public function recoverLost($age = 300);

public function getConnection();

/**
* @throws SampleNotFoundException
*/
Expand Down
30 changes: 30 additions & 0 deletions src/Repositories/RedisKeys.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ private function partitionKey($queue, $partition, $prefix = '')
);
}

private function queueKey($queue, $prefix = '')
{
return sprintf(
'%s%s:*:%s',
$this->fairQueueKeyPrefix(),
$prefix,
$queue
);
}

private function failedPartitionKey($queue, $partition)
{
return $this->partitionKey($queue, $partition, '-failed');
Expand All @@ -25,6 +35,26 @@ private function partitionProcessedCountJobKey($queue, $partition)
return $this->partitionKey($queue, $partition, '-internal') . ':processed';
}

private function queueProcessedJobsInPastMinutesKey($queue, $minutes = 1)
{
$queue_key = $this->queueKey($queue, '-internal');
return "{$queue_key}:processed:{$minutes}min";
}

private function partitionProcessedJobsInPastMinutesKey($queue, $partition, $minutes = 1)
{
$partition_key = $this->partitionKey($queue, $partition, '-internal');
return "{$partition_key}:processed:{$minutes}min";
}

private function recentProcesedJobsPattern($minute = 1)
{
return sprintf(
'%s-internal:*:processed:' . $minute . 'min*',
$this->fairQueueKeyPrefix()
);
}

private function partitionPerSecKey($queue, $partition)
{
return $this->partitionKey($queue, $partition, '-internal') . ':persec';
Expand Down
25 changes: 19 additions & 6 deletions src/Repositories/RedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ public function totalJobsCount($queues)
return $this->totalJobsCountPrivate($queues, 'partitions', 'partitionKey');
}

public function processedJobsInPastMinutes($queues, $minutes)
{
$redis = $this->getConnection();

$total = 0;
foreach($queues as $queue)
{
$queue_key = $this->queueProcessedJobsInPastMinutesKey($queue, $minutes);
$total += $redis->zcard($queue_key);
}
return $total;
}

public function failedPartitions($queue)
{
return $this->partitionsPrivate(
Expand Down Expand Up @@ -202,7 +215,7 @@ public function recoverLost($age = 300)
$redis = $this->getConnection();

$pattern = $this->inProgressJobsPattern();
$keys = $redis->keys($pattern);
$keys = $redis->keys($pattern);

$count = 0;

Expand Down Expand Up @@ -317,7 +330,7 @@ private function partitionsWithCountPrivate(

$pattern = $this->$queuePartitionListPatternResolver($queue);

$keys = $redis->keys($pattern);
$keys = $redis->keys($pattern);
$partitions = [];

foreach ($keys as $key) {
Expand Down Expand Up @@ -346,8 +359,8 @@ private function partitionsWithCountPrivate(

private function jobsPrivate($queue, $partition, $partitionKeyResolver = 'partitionKey')
{
$redis = $this->getConnection();
$perPage = request('limit', 25);
$redis = $this->getConnection();
$perPage = request('limit', 25);
$startingAt = request('starting_at', 0);

$partitionKey = $this->$partitionKeyResolver($queue, $partition);
Expand Down Expand Up @@ -433,9 +446,9 @@ private function popPrivate($queue, $partition, $partitionKeyResolver = 'partiti
return $redis->lpop($partitionKey);
}

private function getConnection()
public function getConnection()
{
$database = config('fair-queue.database');
return Redis::connection($database);
}
}
}

0 comments on commit 65b64d9

Please sign in to comment.