-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RePause All Partitions After Rebalance if user paused any and requested #363
Conversation
@Gin2022Null Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@Gin2022Null Thank you for signing the Contributor License Agreement! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution; LGTM, except we should use boolean
instead of Boolean
.
Resolves #307
In order to achieve graceful shutdown during deployment, We applied the following strategy:
But when new consumers join the consumer group, after rebalance, old consumers wake up, if the newly assigned partitions are not in the pausedByUser, consumer start poll messages which gracefully shutdown are not fully achieved.
Add pauseAllAfterRebalance configuration
When a user pauses topics/partitions before rebalancing, the behavior depends on the value of pauseAllAfterRebalance.If it is set to False, the paused topics/partitions will remain paused after the rebalance. However, if it is set to True, all assigned topics/partitions will be paused after the rebalance.
Test:
ReceiverOptions.create(props).pauseAllAfterRebalance(Boolean.TRUE);
Consumer-1 assigned partitions: testTopic-2
Consumer-2 assigned partitions: testTopic-0, testTopic3-1
Consumer-2 Pause Topic/Partitions
Kafka Consumer Template status=PAUSE topic=testTopic, partition=0
Kafka Consumer Template status=PAUSE topic=testTopic, partition=1
Consumer-3 join - rebalance
Consumer-2 Revoke previously assigned partitions testTopic-0, testTopic-1
Assignment(partitions=[testTopic-2])}
testTopic-2 paused.