-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
101 features: msg properties, msg key, timer msg, trans msg.
- Loading branch information
aliyunmq
committed
Jun 10, 2019
1 parent
41d890c
commit 71e438b
Showing
14 changed files
with
328 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
<?php | ||
namespace MQ; | ||
|
||
use MQ\Exception\InvalidArgumentException; | ||
use MQ\Http\HttpClient; | ||
use MQ\Requests\ConsumeMessageRequest; | ||
use MQ\Requests\AckMessageRequest; | ||
use MQ\Responses\AckMessageResponse; | ||
use MQ\Responses\ConsumeMessageResponse; | ||
|
||
class MQTransProducer extends MQProducer | ||
{ | ||
private $groupId; | ||
|
||
function __construct(HttpClient $client, $instanceId = NULL, $topicName, $groupId) | ||
{ | ||
if (empty($groupId)) { | ||
throw new InvalidArgumentException(400, "GroupId is null"); | ||
} | ||
parent::__construct($client, $instanceId, $topicName); | ||
$this->groupId = $groupId; | ||
} | ||
|
||
/** | ||
* consume transaction half message | ||
* | ||
* @param $numOfMessages: consume how many messages once, 1~16 | ||
* @param $waitSeconds: if > 0, means the time(second) the request holden at server if there is no message to consume. | ||
* If <= 0, means the server will response back if there is no message to consume. | ||
* It's value should be 1~30 | ||
* | ||
* @return Message | ||
* | ||
* @throws TopicNotExistException if queue does not exist | ||
* @throws MessageNotExistException if no message exists | ||
* @throws InvalidArgumentException if the argument is invalid | ||
* @throws MQException if any other exception happends | ||
*/ | ||
public function consumeHalfMessage($numOfMessages, $waitSeconds = -1) | ||
{ | ||
if ($numOfMessages < 0 || $numOfMessages > 16) { | ||
throw new InvalidArgumentException(400, "numOfMessages should be 1~16"); | ||
} | ||
if ($waitSeconds > 30) { | ||
throw new InvalidArgumentException(400, "numOfMessages should less then 30"); | ||
} | ||
$request = new ConsumeMessageRequest($this->instanceId, $this->topicName, $this->groupId, $numOfMessages, $this->messageTag, $waitSeconds); | ||
$request->setTrans(Constants::TRANSACTION_POP); | ||
$response = new ConsumeMessageResponse(); | ||
return $this->client->sendRequest($request, $response); | ||
} | ||
|
||
/** | ||
* commit transaction message | ||
* | ||
* @param $receiptHandle: | ||
* $receiptHandle, which is got from consumeHalfMessage or publishMessage | ||
* | ||
* @return AckMessageResponse | ||
* | ||
* @throws TopicNotExistException if queue does not exist | ||
* @throws ReceiptHandleErrorException if the receiptHandle is invalid | ||
* @throws InvalidArgumentException if the argument is invalid | ||
* @throws AckMessageException if any message not deleted | ||
* @throws MQException if any other exception happends | ||
*/ | ||
public function commit($receiptHandle) | ||
{ | ||
$request = new AckMessageRequest($this->instanceId, $this->topicName, $this->groupId, array($receiptHandle)); | ||
$request->setTrans(Constants::TRANSACTION_COMMIT); | ||
$response = new AckMessageResponse(); | ||
return $this->client->sendRequest($request, $response); | ||
} | ||
|
||
|
||
/** | ||
* rollback transaction message | ||
* | ||
* @param $receiptHandle: | ||
* $receiptHandle, which is got from consumeHalfMessage or publishMessage | ||
* | ||
* @return AckMessageResponse | ||
* | ||
* @throws TopicNotExistException if queue does not exist | ||
* @throws ReceiptHandleErrorException if the receiptHandle is invalid | ||
* @throws InvalidArgumentException if the argument is invalid | ||
* @throws AckMessageException if any message not deleted | ||
* @throws MQException if any other exception happends | ||
*/ | ||
public function rollback($receiptHandle) | ||
{ | ||
$request = new AckMessageRequest($this->instanceId, $this->topicName, $this->groupId, array($receiptHandle)); | ||
$request->setTrans(Constants::TRANSACTION_ROLLBACK); | ||
$response = new AckMessageResponse(); | ||
return $this->client->sendRequest($request, $response); | ||
} | ||
} | ||
|
||
?> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.