-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathDelayRedeliveredMessageExtension.php
62 lines (50 loc) · 1.88 KB
/
DelayRedeliveredMessageExtension.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php
namespace Enqueue\Client\ConsumptionExtension;
use Enqueue\Client\DriverInterface;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\MessageReceivedExtensionInterface;
use Enqueue\Consumption\Result;
class DelayRedeliveredMessageExtension implements MessageReceivedExtensionInterface
{
public const PROPERTY_REDELIVER_COUNT = 'enqueue.redelivery_count';
/**
* @var DriverInterface
*/
private $driver;
/**
* The number of seconds the message should be delayed.
*
* @var int
*/
private $delay;
/**
* @param int $delay The number of seconds the message should be delayed
*/
public function __construct(DriverInterface $driver, $delay)
{
$this->driver = $driver;
$this->delay = $delay;
}
public function onMessageReceived(MessageReceived $context): void
{
$message = $context->getMessage();
if (false == $message->isRedelivered()) {
return;
}
if (false != $context->getResult()) {
return;
}
$delayedMessage = $this->driver->createClientMessage($message);
// increment redelivery count
$redeliveryCount = (int) $delayedMessage->getProperty(self::PROPERTY_REDELIVER_COUNT, 0);
$delayedMessage->setProperty(self::PROPERTY_REDELIVER_COUNT, $redeliveryCount + 1);
$delayedMessage->setDelay($this->delay);
$this->driver->sendToProcessor($delayedMessage);
$context->getLogger()->debug('[DelayRedeliveredMessageExtension] Send delayed message');
$context->setResult(Result::reject('A new copy of the message was sent with a delay. The original message is rejected'));
$context->getLogger()->debug(
'[DelayRedeliveredMessageExtension] '.
'Reject redelivered original message by setting reject status to context.'
);
}
}