Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn] Implementation of Delayed Transaction Messages #17548

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Sep 8, 2022

link #17534

Motivation

now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.

    @Test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @Cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }

This PR will implement clients to send delayed messages with transactions.

Modifications

make transaction message can be put in trackDelayedDelivery to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the MaxReadPosition change—(something about MaxReadPosition https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

Verifying this change

add the test

@congbobo184 congbobo184 added type/feature The PR added a new feature or issue requested a new feature area/transaction doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.11.0 release/2.10.3 labels Sep 8, 2022
@congbobo184 congbobo184 added this to the 2.11.0 milestone Sep 8, 2022
@congbobo184 congbobo184 self-assigned this Sep 8, 2022
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@Jason918 Jason918 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@congbobo184 congbobo184 merged commit 1246d79 into apache:master Sep 15, 2022
congbobo184 added a commit that referenced this pull request Nov 9, 2022
link #17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
@congbobo184 congbobo184 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Nov 9, 2022
Technoboy- pushed a commit that referenced this pull request Nov 15, 2022
link #17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md). 

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test
congbobo184 added a commit that referenced this pull request Nov 26, 2022
link #17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
liangyepianzhou pushed a commit that referenced this pull request Dec 5, 2022
link #17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Dec 6, 2022
…#17548)

link apache#17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.

Code, eg.
```
    @test
    public void testDelayedTransactionMessages() throws Exception {
        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";

        @cleanup
        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("shared-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        @cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(false)
                .create();

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

        // send delayed messages
        for (int i = 0; i < 10; i++) {
            producer.newMessage(transaction)
                    .value("msg-" + i)
                    .deliverAfter(5, TimeUnit.SECONDS)
                    .sendAsync();
        }

        producer.flush();

        transaction.commit().get();

        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
        // the msg now is not null
        assertNull(msg);
    }
```
This PR will implement clients to send delayed messages with transactions.

### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.

It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).

Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened

### Verifying this change
add the test

(cherry picked from commit 1246d79)
(cherry picked from commit fc71323)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants