Skip to content

Commit

Permalink
Merge pull request #20 from aloware/bugfix/fair-queue-improvements
Browse files Browse the repository at this point in the history
Fix bugs
  • Loading branch information
hamed-aloware authored Jun 15, 2022
2 parents 30b13ff + 576deb1 commit 3ab7fa8
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 14 deletions.
2 changes: 1 addition & 1 deletion public/app.js

Large diffs are not rendered by default.

45 changes: 44 additions & 1 deletion resources/js/screens/failed-queues/queue-partitions.vue
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
data() {
return {
ready: false,
saving: false,
loadingNewEntries: false,
hasNewEntries: false,
isModalVisible: false,
amount: '',
page: 1,
perPage: 3,
totalPages: 1,
selectedPartition: null,
partitions: []
};
},
Expand Down Expand Up @@ -77,6 +79,20 @@
});
},
submitRetryAll(partition) {
this.saving = true;
this.$http.post(FairQueue.basePath + '/api/queues/' + this.$route.params.queue + '/partitions/' + this.selectedPartition + '/retry-failed-jobs')
// this.$http.post(FairQueue.basePath + '/api/jobs/retry-failed-jobs')
.then(response => {
this.saving = false;
this.closeModal();
this.$toasted.show(response.data.count + ' Jobs Returned to The Queue');
})
.catch(error => {
this.saving = false;
});
},
/**
* Refresh the jobs every period of time.
*/
Expand All @@ -91,7 +107,8 @@
}, 3000);
},
showModal() {
showModal(partition) {
this.selectedPartition = partition
this.isModalVisible = true;
},
closeModal() {
Expand Down Expand Up @@ -121,6 +138,7 @@
<tr>
<th>Failed Partition Name</th>
<th>Number Of Jobs</th>
<th></th>
</tr>
</thead>

Expand All @@ -142,10 +160,35 @@
<td>
<span>{{ partition.count }}</span>
</td>
<td>
<button @click="showModal(partition.name)" class="btn btn-primary btn-sm">Retry All</button>
</td>
</tr>
</tbody>
</table>

<Modal
v-show="isModalVisible"
@close="closeModal"
>
<template v-slot:header>
Retry All
</template>

<template v-slot:body>
<div style="width: 250px">Are you sure?</div>
</template>

<template v-slot:footer>
<button @click="submitRetryAll" :disabled="saving" class="btn btn-primary btn-sm">
<svg v-if="saving" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 20 20" class="icon spin mr-2 fill-text-color">
<path d="M12 10a2 2 0 0 1-3.41 1.41A2 2 0 0 1 10 8V0a9.97 9.97 0 0 1 10 10h-8zm7.9 1.41A10 10 0 1 1 8.59.1v2.03a8 8 0 1 0 9.29 9.29h2.02zm-4.07 0a6 6 0 1 1-7.25-7.25v2.1a3.99 3.99 0 0 0-1.4 6.57 4 4 0 0 0 6.56-1.42h2.1z"></path>
</svg>
<span v-else>Confirm</span>
</button>
</template>
</Modal>

</div>

</template>
4 changes: 2 additions & 2 deletions resources/js/screens/queues/queue-partitions.vue
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<tr>
<th>Partition Name</th>
<th>Number Of Jobs</th>
<th>n/s</th>
<th>n/m</th>
</tr>
</thead>

Expand All @@ -144,7 +144,7 @@
<span>{{ partition.count }}</span>
</td>
<td>
<span>{{ partition.per_second }}</span>
<span>{{ partition.per_minute }}</span>
</td>
</tr>
</tbody>
Expand Down
27 changes: 27 additions & 0 deletions src/Http/Controllers/QueueController.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,33 @@ public function retryFailedJobs()
]);
}

/**
* Retry Lost Jobs of a Specific Partition
*
* @return array
*/
public function retryPartitionFailedJobs($queue, $partition)
{
$count = FairQueue::retryPartitionFailedJobs($queue, $partition);
return response()->json([
'count' => $count
]);
}

/**
* Recover Lost Jobs of a Specific Partition
*
* @return array
*/
public function recoverPartitionLostJobs($queue, $partition)
{
$recovered_count = FairQueue::recoverPartitionLost($queue, $partition, request()->amount);

return response()->json([
'recovered' => $recovered_count
]);
}

public function purgeFailedJobs()
{
FairQueue::purgeFailedJobs();
Expand Down
4 changes: 4 additions & 0 deletions src/Interfaces/RepositoryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public function totalFailedJobsCount($queues);

public function retryFailedJobs();

public function retryPartitionFailedJobs($queue, $partition);

public function recoverPartitionLost($queue, $partition);

public function purgeFailedJobs();

public function recoverLost($age = 300);
Expand Down
17 changes: 16 additions & 1 deletion src/Repositories/RedisKeys.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ private function partitionKey($queue, $partition, $prefix = '')
private function queueKey($queue, $prefix = '')
{
return sprintf(
'%s%s:*:%s',
'%s%s:%s:*',
$this->fairQueueKeyPrefix(),
$prefix,
$queue
Expand All @@ -41,6 +41,12 @@ private function queueProcessedJobsInPastMinutesKey($queue, $minutes = 1)
return "{$queue_key}:processed:{$minutes}min";
}

private function processedJobsInPastMinutesKey($minutes = 1)
{
$queue_key = $this->queueKey('*', '-internal');
return "{$queue_key}:processed:{$minutes}min";
}

private function partitionProcessedJobsInPastMinutesKey($queue, $partition, $minutes = 1)
{
$partition_key = $this->partitionKey($queue, $partition, '-internal');
Expand Down Expand Up @@ -132,6 +138,15 @@ private function inProgressJobsPattern()
);
}

private function partitionInProgressJobKey($queue, $partition)
{
return sprintf(
'%s-inprogress:*:%s:%s',
$queue,
$partition
);
}

private function extractQueueNameFromPartitionKey($partitionKey)
{
$rep = $this->removePrefix($this->fairQueueKeyPrefix() . ':', $partitionKey);
Expand Down
77 changes: 68 additions & 9 deletions src/Repositories/RedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,23 @@ public function processedJobsInPastMinutes($queues, $minutes)
$redis = $this->getConnection();

$total = 0;
foreach($queues as $queue)
$queue_key = $this->processedJobsInPastMinutesKey($minutes);
$keys = $redis->keys($queue_key);
foreach($keys as $key)
{
$queue_key = $this->queueProcessedJobsInPastMinutesKey($queue, $minutes);
$total += $redis->zcard($queue_key);
$total += $redis->zcard($key);
}
return $total;
}

public function partitionProcessedJobsInPastMinutes($queue, $partition, $minutes)
{
$redis = $this->getConnection();

$partition_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, $minutes);
return $redis->zcard($partition_key);
}

public function failedPartitions($queue)
{
return $this->partitionsPrivate(
Expand Down Expand Up @@ -195,6 +204,23 @@ public function retryFailedJobs()
return $count;
}

public function retryPartitionFailedJobs($queue, $partition)
{
$count = 0;

$queueSize = 0;

while ($job = $this->popFailed($queue, $partition)) {
$this->push($queue, $partition, $job);
$queueSize++;
$count++;
}

$this->generateFakeSignals($queue, $queueSize);

return $count;
}

public function purgeFailedJobs()
{
$redis = $this->getConnection();
Expand Down Expand Up @@ -247,6 +273,43 @@ public function recoverLost($age = 300)
return $count;
}

public function recoverPartitionLost($queue, $partition, $age = 300)
{
$redis = $this->getConnection();

$pattern = $this->partitionInProgressJobKey($queue, $partition);
$keys = $redis->keys($pattern);

$count = 0;

foreach ($keys as $key) {
list ($connection, $queue, $partition, $jobUuid) = $this->extractInProgressJobKey($key);

$inProgressJobKey = $this->inProgressJobKey($connection, $queue, $partition, $jobUuid);

$lastAccess = $redis->object('idletime', $inProgressJobKey);
if ($lastAccess < $age) {
continue;
}

// restore the job into partition
$this->push($queue, $partition, $redis->get($inProgressJobKey));
//

// and generate fake signal
$dispatch = dispatch(new FairSignalJob(null))->onQueue($queue);

if (!empty($connection)) {
$dispatch->onConnection($connection);
}
//

$count++;
}

return $count;
}

/**
* @throws SampleNotFoundException
*/
Expand Down Expand Up @@ -344,11 +407,7 @@ private function partitionsWithCountPrivate(
];

if ($includePartitionPerSecKeyColumn) {
$partitionPerSecKey = $this->$partitionPerSecKeyResolver($queue, $partition);

list ($lastAccess, $lastPersec) = explode(',', $redis->get($partitionPerSecKey) ?? '0,0');

$item['per_second'] = $lastPersec;
$item['per_minute'] = $this->partitionProcessedJobsInPastMinutes($queue, $partition, 1);
}

$partitions[] = $item;
Expand Down Expand Up @@ -425,7 +484,7 @@ private function popPrivate($queue, $partition, $partitionKeyResolver = 'partiti

$partitionKey = $this->$partitionKeyResolver($queue, $partition);

$processedKey = $this->partitionProcessedCountJobKey($queue, $partition);
$processedKey = $this->partitionProcessedCountJobKey($queue, $partition);
$partitionPerSecKey = $this->partitionPerSecKey($queue, $partition);

$redis->incr($processedKey);
Expand Down

0 comments on commit 3ab7fa8

Please sign in to comment.