Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][txn] Implementation of Delayed Transaction Messages (#17548)
link #17548 ### Motivation now delayed features and transaction messages cannot be used together. When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers. Code, eg. ``` @test public void testDelayedTransactionMessages() throws Exception { String topic = NAMESPACE1 + "/testDelayedTransactionMessages"; @cleanup Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("shared-sub") .subscriptionType(SubscriptionType.Shared) .subscribe(); @cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(false) .create(); Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); // send delayed messages for (int i = 0; i < 10; i++) { producer.newMessage(transaction) .value("msg-" + i) .deliverAfter(5, TimeUnit.SECONDS) .sendAsync(); } producer.flush(); transaction.commit().get(); Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS); // the msg now is not null assertNull(msg); } ``` This PR will implement clients to send delayed messages with transactions. ### Modifications make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction. It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md). Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened ### Verifying this change add the test
- Loading branch information