-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Broker] PIP:84 Redeliver command add epoch. #10478
[Broker] PIP:84 Redeliver command add epoch. #10478
Conversation
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -604,6 +605,8 @@ message CommandCloseConsumer { | |||
message CommandRedeliverUnacknowledgedMessages { | |||
required uint64 consumer_id = 1; | |||
repeated MessageIdData message_ids = 2; | |||
optional uint64 consumer_epoch = 3 [default = 0]; | |||
optional uint64 request_id = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the request_id, looks like we don't have a redelivery response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 do we need this one?
* @see #readEntries(int) | ||
*/ | ||
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, | ||
Object ctx, PositionImpl maxPosition); | ||
Object ctx, PositionImpl maxPosition, long epoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the epoch into the ctx? I think it should be a context or a carrier, if want to add more ctx in the future, we do not want to add more params, maybe we can try to create a ReadEntryContext(with recycle), the consumer ref, and the epoch should ship to the ReadEntryContext
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
…ver_add_epoch_pip84 # Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
…ver_add_epoch_pip84 # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
…ver_add_epoch_pip84 # Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work
I left some minor comments
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
Outdated
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
Outdated
Show resolved
Hide resolved
@@ -526,6 +529,7 @@ message CommandMessage { | |||
required MessageIdData message_id = 2; | |||
optional uint32 redelivery_count = 3 [default = 0]; | |||
repeated int64 ack_set = 4; | |||
optional uint64 consumer_epoch = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default epoch is -1L, but the consumer_epoch
defined in pb is uint64, it will be the max value of uint64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To prevent use of epoch in other ways in future so we set this to uint64
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
// we need to keep both epochs the same | ||
if (conf.getSubscriptionType() == SubscriptionType.Failover | ||
|| conf.getSubscriptionType() == SubscriptionType.Exclusive) { | ||
CONSUMER_EPOCH.incrementAndGet(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For MultiTopicsConsumer, it increased CONSUMER_EPOCH, and then call redeliverUnacknowledgedMessages
for each consumer, will it lead epoch in consistent between consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every ConsumerImpl
in MultiTopicsConsumerImpl
will increase the CONSUMER_EPOCH, they will change at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, as soon as other comments from other reviewers are addressed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job.
apache/pulsar#10478 introduced an API change to `MessageImpl#create`, which is included in 2.10.0.0-rc4, we need to fix the conflict.
apache/pulsar#10478 introduced an API change to `MessageImpl#create`, which is included in 2.10.0.0-rc4, we need to fix the conflict. (cherry picked from commit 857a379)
## Motivation detail in https://github.com/apache/pulsar/wiki/PIP-84-%3A-Pulsar-client%3A-Redeliver-command-add-epoch. ### Verifying this change Add the tests for it Does this pull request potentially affect one of the following parts: If yes was chosen, please highlight the changes
#10478 ### Motivation since #10478 merged, we should change the cumulative ack with transaction abort redeliver logic. We can't redeliver unCumulativeAck message by the server because the client will receive the new message and ack then will receive the old message they abort. in this case: 1. we have 5 message 2. cumulative ack 3 messages with the transaction 3. we abort this transaction 4. server redeliver message by the current consumer_epoch 5. the client will not filter the 4 or 5 messages, because in #10478 we don't change the client consumer epoch 6. client cumulative ack 4 5 with transaction and commit will lose the 1 2 3 messages and the consume message, not in order. ### Modifications don't redeliver any cumulative ack messages, it will do by user self
apache#10478 ### Motivation since apache#10478 merged, we should change the cumulative ack with transaction abort redeliver logic. We can't redeliver unCumulativeAck message by the server because the client will receive the new message and ack then will receive the old message they abort. in this case: 1. we have 5 message 2. cumulative ack 3 messages with the transaction 3. we abort this transaction 4. server redeliver message by the current consumer_epoch 5. the client will not filter the 4 or 5 messages, because in apache#10478 we don't change the client consumer epoch 6. client cumulative ack 4 5 with transaction and commit will lose the 1 2 3 messages and the consume message, not in order. ### Modifications don't redeliver any cumulative ack messages, it will do by user self
Motivation
detail in https://github.com/apache/pulsar/wiki/PIP-84-%3A-Pulsar-client%3A-Redeliver-command-add-epoch.
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (yes)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)