-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Topic policy reader can't recover when get any exception. #17562
Conversation
This reverts commit c28444c.
This reverts commit 64165c5.
initPolicesCache(reader, result); | ||
result.thenRun(() -> readMorePolicies(reader)); | ||
} | ||
readerCompletableFuture.thenAccept(reader -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use thenAccept
to avoid using the null value in whenComplete
again. because we will pass the result
future to the next operation. So, we don't need thenCompose
if (ex != null) { | ||
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); | ||
result.completeExceptionally(ex); | ||
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current time reader is null, So we can't clean the cache and close the reader.
result.thenRun(() -> readMorePolicies(reader)); | ||
}).exceptionally(ex -> { | ||
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); | ||
cleanCacheAndCloseReader(namespace, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method won't have any chance to throw an exception. I think it's safe enough and don't need try-catch.
readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync); | ||
readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync) | ||
.exceptionally(ex -> { | ||
log.warn("[{}] Close change_event reader fail.", namespace, ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a log to help debug. Maybe we can introduce more mechanisms to clean up the resource. But it's not the current PR focus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
apache#17562) (cherry picked from commit 5aa1f11) (cherry picked from commit c844c20)
apache#17562) (cherry picked from commit 5aa1f11)
…rge request !44) [fix][broker] Topic policy reader can't recover when get any exception (apache#17562)
apache#17562) (cherry picked from commit 5aa1f11)
…exception (apache#17562)" This reverts commit c3b49ae.
@mattisonchao @zymap Do we have any plan to release 2.8.5 shortly? |
Fix #15891
Motivation
When creating the topic policy reader got any problem(maybe a network problem), and we can not clear the current state, which will let the broker can not retry to init the topic policy reader again until we unload the namespace.
Modifications
Verifying this change
I think it's not a logical error. we don't need to add the unit test.
Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)