Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis-list-driver-implementation #8

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Support for PHP 8.3 and Symfony 7

- RedisProxyListDriver - list driver for RedisProxy

## [1.0.0] - 2023-09-06
### Added
- redis-proxy 1.0 support
Expand Down
144 changes: 144 additions & 0 deletions src/Driver/RedisProxyListDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?php

declare(strict_types=1);

namespace Efabrica\HermesExtension\Driver;

use Closure;
use Efabrica\HermesExtension\Heartbeat\HeartbeatBehavior;
use Efabrica\HermesExtension\Heartbeat\HermesProcess;
use RedisProxy\RedisProxy;
use RedisProxy\RedisProxyException;
use Tomaj\Hermes\Dispatcher;
use Tomaj\Hermes\Driver\DriverInterface;
use Tomaj\Hermes\Driver\MaxItemsTrait;
use Tomaj\Hermes\Driver\SerializerAwareTrait;
use Tomaj\Hermes\Driver\ShutdownTrait;
use Tomaj\Hermes\Driver\UnknownPriorityException;
use Tomaj\Hermes\MessageInterface;
use Tomaj\Hermes\MessageSerializer;
use Tomaj\Hermes\Shutdown\ShutdownException;
use Tomaj\Hermes\SerializeException;

final class RedisProxyListDriver implements DriverInterface
{
use MaxItemsTrait;
use ShutdownTrait;
use SerializerAwareTrait;
use HeartbeatBehavior;

/** @var array<int, string> */
private array $queues = [];

private RedisProxy $redis;

private float $refreshInterval;

public function __construct(RedisProxy $redis, string $key, float $refreshInterval = 1)
{
$this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY);

$this->redis = $redis;
$this->refreshInterval = $refreshInterval;
$this->serializer = new MessageSerializer();
}

/**
* @throws RedisProxyException
* @throws SerializeException
* @throws UnknownPriorityException
*/
public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool
{
$key = $this->getKey($priority);
return (bool)$this->redis->rpush($key, $this->serializer->serialize($message));
}

public function setupPriorityQueue(string $name, int $priority): void
{
$this->queues[$priority] = $name;
}

/**
* @throws RedisProxyException
* @throws SerializeException
* @throws ShutdownException
* @throws UnknownPriorityException
*/
public function wait(Closure $callback, array $priorities = []): void
{
$queues = $this->queues;
krsort($queues);
while (true) {
$this->checkShutdown();
$this->checkToBeKilled();
if (!$this->shouldProcessNext()) {
break;
}

$messageString = null;
$foundPriority = null;

foreach ($queues as $priority => $name) {
if (count($priorities) > 0 && !in_array($priority, $priorities, true)) {
continue;
}
$key = $this->getKey($priority);
$length = $this->getLength($key);
if ($length === 0) {
continue;
}
$foundPriority = $priority;
for ($i = 0; $i < $length; $i++) {
$messageString = $this->pop($key);

if ($messageString !== null) {
$this->ping(HermesProcess::STATUS_PROCESSING);
$message = $this->serializer->unserialize($messageString);
$callback($message, $foundPriority);
$this->incrementProcessedItems();
}
}
}

if ($this->refreshInterval) {
$this->checkShutdown();
$this->checkToBeKilled();
$this->ping(HermesProcess::STATUS_IDLE);
usleep(intval($this->refreshInterval * 1000000));
}
}
}

/**
* @throws UnknownPriorityException
*/
private function getKey(int $priority): string
{
if (!isset($this->queues[$priority])) {
throw new UnknownPriorityException("Unknown priority {$priority}");
}
return $this->queues[$priority];
}

/**
* @throws RedisProxyException
*/
private function pop(string $key): ?string
{
$messageString = $this->redis->lpop($key);
if (is_string($messageString) && $messageString !== '') {
return $messageString;
}

return null;
}

/**
* @throws RedisProxyException
*/
private function getLength(string $key): int
{
return $this->redis->llen($key);
}
}
Loading