Skip to content

Commit

Permalink
[improve][txn] Implementation of Delayed Transaction Messages (apache…
Browse files Browse the repository at this point in the history
…#17548)

link apache#17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
(cherry picked from commit fc71323)
  • Loading branch information
congbobo184 authored and nicoloboschi committed Dec 6, 2022
1 parent 71c3b25 commit c3747d8
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
entry.release();
continue;
}
} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
}

if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -1276,4 +1278,63 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {
assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}

@Test
public void testDelayedTransactionMessages() throws Exception {
String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

@Cleanup
Consumer<String> failoverConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("failover-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();

Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
for (int i = 0; i < 10; i++) {
producer.newMessage(transaction)
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}

producer.flush();

transaction.commit().get();

// Failover consumer will receive the messages immediately while
// the shared consumer will get them after the delay
Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
assertNull(msg);

for (int i = 0; i < 10; i++) {
msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
assertEquals(msg.getValue(), "msg-" + i);
}

Set<String> receivedMsgs = new TreeSet<>();
for (int i = 0; i < 10; i++) {
msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
}

assertEquals(receivedMsgs.size(), 10);
for (int i = 0; i < 10; i++) {
assertTrue(receivedMsgs.contains("msg-" + i));
}
}
}

0 comments on commit c3747d8

Please sign in to comment.