Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
iamfarhad committed Jan 1, 2023
1 parent 5df4969 commit 8a11086
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 166 deletions.
28 changes: 28 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
use Rector\Set\ValueObject\SetList;

return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__ . '/Config',
__DIR__ . '/src',
]);

// register a single rule
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);

// define sets of rules
$rectorConfig->sets([
LevelSetList::UP_TO_PHP_81,
SetList::CODE_QUALITY,
SetList::CODING_STYLE,
SetList::NAMING,
SetList::PRIVATIZATION,
SetList::TYPE_DECLARATION
]);
};
45 changes: 21 additions & 24 deletions src/Connectors/RabbitMQConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,45 @@
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Connection\AMQPConnectionFactory;

class RabbitMQConnector implements ConnectorInterface
final class RabbitMQConnector implements ConnectorInterface
{
private readonly Dispatcher $dispatcher;

public function __construct(Dispatcher $dispatcher)
public function __construct(private readonly Dispatcher $dispatcher)
{
$this->dispatcher = $dispatcher;
}

public function connect(array $config = []): Queue
{
$configs = new AMQPConnectionConfig();
$amqpConnectionConfig = new AMQPConnectionConfig();

// set AMQP account
$configs->setHost(config('queue.connections.rabbitmq.hosts.host'));
$configs->setPort(config('queue.connections.rabbitmq.hosts.port'));
$configs->setUser(config('queue.connections.rabbitmq.hosts.user'));
$configs->setPassword(config('queue.connections.rabbitmq.hosts.password'));
$configs->setVhost(config('queue.connections.rabbitmq.hosts.vhost'));
$amqpConnectionConfig->setHost(config('queue.connections.rabbitmq.hosts.host'));
$amqpConnectionConfig->setPort(config('queue.connections.rabbitmq.hosts.port'));
$amqpConnectionConfig->setUser(config('queue.connections.rabbitmq.hosts.user'));
$amqpConnectionConfig->setPassword(config('queue.connections.rabbitmq.hosts.password'));
$amqpConnectionConfig->setVhost(config('queue.connections.rabbitmq.hosts.vhost'));

$configs->setIsLazy(config('queue.connections.rabbitmq.hosts.lazy'));
$configs->setKeepalive(config('queue.connections.rabbitmq.hosts.keepalive'));
$configs->setHeartbeat(config('queue.connections.rabbitmq.hosts.heartbeat'));
$amqpConnectionConfig->setIsLazy(config('queue.connections.rabbitmq.hosts.lazy'));
$amqpConnectionConfig->setKeepalive(config('queue.connections.rabbitmq.hosts.keepalive'));
$amqpConnectionConfig->setHeartbeat(config('queue.connections.rabbitmq.hosts.heartbeat'));


// set SSL Options
$configs->setSslCaCert(config('queue.connections.rabbitmq.options.ssl_options.cafile'));
$configs->setSslCert(config('queue.connections.rabbitmq.options.ssl_options.local_cert'));
$configs->setSslKey(config('queue.connections.rabbitmq.options.ssl_options.local_key'));
$configs->setSslVerify(config('queue.connections.rabbitmq.options.ssl_options.verify_peer'));
$configs->setSslPassPhrase(config('queue.connections.rabbitmq.options.ssl_options.passphrase'));
$amqpConnectionConfig->setSslCaCert(config('queue.connections.rabbitmq.options.ssl_options.cafile'));
$amqpConnectionConfig->setSslCert(config('queue.connections.rabbitmq.options.ssl_options.local_cert'));
$amqpConnectionConfig->setSslKey(config('queue.connections.rabbitmq.options.ssl_options.local_key'));
$amqpConnectionConfig->setSslVerify(config('queue.connections.rabbitmq.options.ssl_options.verify_peer'));
$amqpConnectionConfig->setSslPassPhrase(config('queue.connections.rabbitmq.options.ssl_options.passphrase'));

// Create AMQP Connection
$connection = AMQPConnectionFactory::create($configs);
$connection = AMQPConnectionFactory::create($amqpConnectionConfig);
$defaultQueue = config('queue.connections.rabbitmq.queue');

$queueConnection = new RabbitQueue($connection, $defaultQueue);
$rabbitQueue = new RabbitQueue($connection, $defaultQueue);

$this->dispatcher->listen(WorkerStopping::class, static function () use ($queueConnection): void {
$queueConnection->close();
$this->dispatcher->listen(WorkerStopping::class, static function () use ($rabbitQueue): void {
$rabbitQueue->close();
});

return $queueConnection;
return $rabbitQueue;
}
}
4 changes: 2 additions & 2 deletions src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'rabbitmq:consume')]
class ConsumeCommand extends WorkCommand
final class ConsumeCommand extends WorkCommand
{
protected $signature = 'rabbitmq:consume
{connection? : The name of the queue connection to work}
Expand Down Expand Up @@ -49,7 +49,7 @@ public function handle(): void
parent::handle();
}

protected function consumerTag(): string
private function consumerTag(): string
{
if ($consumerTag = $this->option('consumer-tag')) {
return $consumerTag;
Expand Down
74 changes: 35 additions & 39 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@
use Throwable;
use iamfarhad\LaravelRabbitMQ\RabbitQueue;

class Consumer extends Worker
final class Consumer extends Worker
{
protected Container $container;
private Container $container;

protected string $consumerTag;
private string $consumerTag;

protected int $prefetchSize;
private int $prefetchSize;

protected int $maxPriority;
private int $maxPriority;

protected int $prefetchCount;
private int $prefetchCount;

protected AMQPChannel $channel;
private AMQPChannel $amqpChannel;

private $currentJob;
private ?object $currentJob = null;

public function setContainer(Container $value): void
public function setContainer(Container $container): void
{
$this->container = $value;
$this->container = $container;
}

public function setConsumerTag(string $value): void
Expand Down Expand Up @@ -58,62 +58,60 @@ public function setPrefetchCount(int $value): void
*
* @param string $connectionName
* @param string $queue
* @param WorkerOptions $options
* @return int
*
* @throws Throwable
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
public function daemon($connectionName, $queue, WorkerOptions $workerOptions)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}

$lastRestart = $this->getTimestampOfLastQueueRestart();

[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
$timestampOfLastQueueRestart = $this->getTimestampOfLastQueueRestart();
$startTime = hrtime(true) / 1e9;
$jobsProcessed = 0;

$connection = $this->manager->connection($connectionName);

$this->channel = $connection->getChannel();
$this->amqpChannel = $connection->getChannel();

$this->channel->basic_qos(
$this->amqpChannel->basic_qos(
$this->prefetchSize,
$this->prefetchCount,
null
);

$jobClass = $connection->getJobClass();
$arguments = [];
if ($this->maxPriority) {
if ($this->maxPriority !== 0) {
$arguments['priority'] = ['I', $this->maxPriority];
}

$this->channel->basic_consume(
$this->amqpChannel->basic_consume(
$queue,
$this->consumerTag,
false,
false,
false,
false,
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
$job = new $jobClass(
$this->container,
$connection,
$message,
$amqpMessage,
$connectionName,
$queue
);

$this->currentJob = $job;

if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
$this->registerTimeoutHandler($job, $workerOptions);
}

$jobsProcessed++;
++$jobsProcessed;

$this->runJob($job, $connectionName, $options);
$this->runJob($job, $connectionName, $workerOptions);

if ($this->supportsAsyncSignals()) {
$this->resetTimeoutHandler();
Expand All @@ -123,21 +121,21 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
$arguments
);

while ($this->channel->is_consuming()) {
while ($this->amqpChannel->is_consuming()) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
if (! $this->daemonShouldRun($workerOptions, $connectionName, $queue)) {
$this->pauseWorker($workerOptions, $timestampOfLastQueueRestart);

continue;
}

// If the daemon should run (not in maintenance mode, etc.), then we can wait for a job.
try {
$this->channel->wait(null, true, (int) $options->timeout);
} catch (AMQPRuntimeException $exception) {
$this->exceptions->report($exception);
$this->amqpChannel->wait(null, true, (int) $workerOptions->timeout);
} catch (AMQPRuntimeException $amqpRuntimeException) {
$this->exceptions->report($amqpRuntimeException);

$this->kill(1);
} catch (Exception | Throwable $exception) {
Expand All @@ -148,15 +146,15 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu

// If no job is got off the queue, we will need to sleep the worker.
if ($this->currentJob === null) {
$this->sleep($options->sleep);
$this->sleep($workerOptions->sleep);
}

// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$status = $this->stopIfNecessary(
$options,
$lastRestart,
$workerOptions,
$timestampOfLastQueueRestart,
$startTime,
$jobsProcessed,
$this->currentJob
Expand All @@ -173,21 +171,19 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
/**
* Determine if the daemon should process on this iteration.
*
* @param WorkerOptions $options
* @param string $connectionName
* @param string $queue
* @return bool
*/
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool
protected function daemonShouldRun(WorkerOptions $workerOptions, $connectionName, $queue): bool
{
return ! ((($this->isDownForMaintenance)() && ! $options->force) || $this->paused);
return !(($this->isDownForMaintenance)() && ! $workerOptions->force) && !$this->paused;
}

public function stop($status = 0, $options = []): int
{
// Tell the server you are going to stop consuming.
// It will finish up the last message and not send you anymore.
$this->channel->basic_cancel($this->consumerTag, false, true);
$this->amqpChannel->basic_cancel($this->consumerTag, false, true);

return parent::stop($status);
}
Expand Down
Loading

0 comments on commit 8a11086

Please sign in to comment.