-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][client]PIP-359:Support custom message listener executor for specific subscription #22861
[improve][client]PIP-359:Support custom message listener executor for specific subscription #22861
Conversation
b67cf02
to
c32a0c5
Compare
A simpler solution to your problem could be to add
The caller would be responsible of the life cycle of the Executor and it would be used only for this specific consumer. |
This isn't directly related to this feature request, but I thought that it might be useful to share. There's "PIP-234: Support using shared thread pool across multiple Pulsar client instance" https://lists.apache.org/thread/5jw06hqlmwnrgvbn9lfom1vkwhwqwwd4 . This hasn't been implemented yet. There's also a completely unrelated challenge when a single client is shared. It's about the rate limiting and backpressure handling. That is mentioned in the out-of-scope part of PIP-322. Pulsar will share the same set of connections across all producers and consumers. When a producer or consumer is rate limited, it will impact other producers and consumers using the same connection. |
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
Outdated
Show resolved
Hide resolved
Yes, this is a problem. messages get queued in the executor thread pool as they arrive. |
c32a0c5
to
f1f36da
Compare
Yes, configuring receiver queue sizes seems to work, but in actual scenarios, it is difficult to accurately determine the priority of each consumer and set a reasonable queue size. In addition, this priority may change at any time as the business changes. Therefore, this solution is very difficult to operate and maintain and is not the best solution. |
Looks great, hope it can be implemented soon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some further comments to simplify MessageListenerExecutor. The lifecycle is managed by the application that provides the instance and that's why there shouldn't be any details of the executor. Behind the MessageListenerExecutor
there might be multiple different queues / thread pools.
Passing the message instance will allow using the key or any other properties of the message to do decisions when it is needed, for example in an executor that uses a priority queue implementation.
This change will also need a PIP. Please check for other PIPs on the mailing list for an example https://lists.apache.org/[email protected] . You can find mailing list joining instructions at https://pulsar.apache.org/contact/ .
The PIP template is at https://github.com/apache/pulsar/blob/master/pip/TEMPLATE.md .
Before the PIP is accepted, we cannot merge implementation PRs.
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
Outdated
Show resolved
Hide resolved
Thanks for this proposal @AuroraTwinkle . I believe that this will be a very useful addition to the Pulsar client. There are certain details of the Message Listener support in the Pulsar client that aren't handled currently which could lead to unnecessary duplicate messages. That's something that isn't directly related to the MessageListenerExecutor. |
e67d87c
to
95140d3
Compare
Okay, thanks for your prompt reply, i will try my best! |
Okay, I need some time to fully understand it. |
2708a4b
to
d2de8f6
Compare
@liangyepianzhou Please don't close and reopen PRs when that isn't needed. Primarily re-running in GitHub UI (for committers) and commenting "/pulsarbot rerun-failure-checks" on the PR should be used. Closing and reopening is fine when the previous PR build is over 3 days old and the cached artifacts have expired or when there's a specific need to pick up latest changes from master branch. If closing and reopening is used as the general approach for retrying, it will waste a lot of CI resources since all builds are re-run. It's possible to continue iterating on the solution in builds in the forked repository without any limits. In this case, this PR requires a PIP. It doesn't make sense to keep on running the jobs in apache/pulsar until the request is resolved and this PR is really ready for merging. |
...ent/src/main/java/org/apache/pulsar/client/impl/DefaultKeySharedMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
...ent/src/main/java/org/apache/pulsar/client/impl/DefaultKeySharedMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
...ent/src/main/java/org/apache/pulsar/client/impl/DefaultKeySharedMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultMessageListenerExecutor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
Show resolved
Hide resolved
3a8af2f
to
178bda0
Compare
/pulsarbot rerun-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Before merging, I'll close and reopen this PR to ensure that CI passes with latest changes in master branch. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22861 +/- ##
============================================
+ Coverage 73.57% 74.10% +0.52%
- Complexity 32624 33939 +1315
============================================
Files 1877 1919 +42
Lines 139502 150946 +11444
Branches 15299 17279 +1980
============================================
+ Hits 102638 111852 +9214
- Misses 28908 30521 +1613
- Partials 7956 8573 +617
Flags with carried forward coverage won't be shown. Click here to find out more.
|
… specific subscription (apache#22861) Co-authored-by: duanlinlin <[email protected]> [PIP-359](apache#22902) Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions. <!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md --> ### Motivation In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers. <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications Support custom message listener thread pool for specific subscription. <!-- Describe the modifications you've done. -->
… specific subscription (#22861) Co-authored-by: duanlinlin <[email protected]> [PIP-359](#22902) Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions. <!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md --> ### Motivation In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers. <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications Support custom message listener thread pool for specific subscription. <!-- Describe the modifications you've done. --> (cherry picked from commit 10f4e02)
@liangyepianzhou Did we make a decision to cherry-pick this to branch-3.0 ? PIPs don't get cherry-picked by default to maintenance branches. |
@lhotari This has been discussed on the mailing list as usual. |
thanks, it's good to have the URL of that discussion recorded here. This is addressed. |
… specific subscription (apache#22861) Co-authored-by: duanlinlin <[email protected]> [PIP-359](apache#22902) Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions. <!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md --> ### Motivation In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers. <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications Support custom message listener thread pool for specific subscription. <!-- Describe the modifications you've done. -->
… specific subscription (apache#22861) Co-authored-by: duanlinlin <[email protected]> [PIP-359](apache#22902) Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions. <!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md --> ### Motivation In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers. <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications Support custom message listener thread pool for specific subscription. <!-- Describe the modifications you've done. --> (cherry picked from commit 10f4e02) (cherry picked from commit c5846bb)
… specific subscription (apache#22861) Co-authored-by: duanlinlin <[email protected]> [PIP-359](apache#22902) Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions. <!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md --> ### Motivation In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers. <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> ### Modifications Support custom message listener thread pool for specific subscription. <!-- Describe the modifications you've done. --> (cherry picked from commit 10f4e02) (cherry picked from commit c5846bb)
PIP-359
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.
Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.
By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.
Modifications
Support custom message listener thread pool for specific subscription.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: AuroraTwinkle#1