-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker]: intercept transaction begin and end events for observability #14613
Conversation
@madhavan-narayanan:Thanks for your contribution. For this PR, do we need to update docs? |
@madhavan-narayanan:Thanks for providing doc info! |
Could we add these interceptor methods in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please help add tests for covering the new changes.
* @param tcId Transaction Coordinator Id | ||
* @param txnID Transaction ID | ||
*/ | ||
default void beginTxn(long tcId, String txnID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use newTxn to keep consistent with the command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And from the implementation, I think it should be txnOpened
and txnEnded
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), | ||
txnID.getMostSigBits())); | ||
if (getBrokerService().getInterceptor() != null) { | ||
getBrokerService().getInterceptor().beginTxn(command.getTcId(), txnID.toString()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to move to PulsarCommandSender
so that we can have a unified management of sent commands and interception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @codelipenghui ,
I have added unit tests and refactored code as you had recommended. Can you please review again?
Currently, there is a method |
Hi @gaoran10 , |
@madhavan-narayanan Maybe we could add a new param |
@gaoran10 , I understand your concern. I can add another overloaded method for 'onPulsarCommand' with additional 'context' parameter to avoid proliferation of callbacks in the future. But is the opaque 'Object context' any concern? Developers implementing the interceptor have to understand the structure and semantics of the context by browsing the broker code or from the documentation of 'onPulsarCommand' which I am afraid can become unwieldy and ambiguous later. |
@madhavan-narayanan Ok, the method |
…ability (apache#14613) ### Motivation This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
…ability (apache#14613) ### Motivation This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
…ability (apache#14613) This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
…ability (apache#14613) This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
…ability (apache#14613) This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #12858
Fixes: #12858
Motivation
This is to provide visibility to cluster operators when transactions are handled by the broker. Currently there is no way to know when a transaction begins and the produce/ack operations contained in it and the end status of the transaction (commit/rollback). Without this information, it is tough to support end customers where there are queries about missing messages or failed processing
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)