-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][tableview] fixed ack failure in ReaderImpl due to null messageId #17728
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
Outdated
Show resolved
Hide resolved
|
||
@DataProvider(name = "partitionedTopic") | ||
public static Object[][] partitioned() { | ||
return new Object[][] {{true}, {false}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I try the test on my laptop.
Looks like the test for the partitioned topic will always get passed without the fix.
The non-partitioned topic test works fine, I see the warning logs. After applying the fix, the test can get passed.
2022-09-20T20:05:46,640 - WARN - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/tableview-no-partition-ack-test][reader-57f0eede9d] acknowledge message null cumulative fail.
org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[classes/:?]
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) ~[?:?]
at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$ktU1uSvS.invokeWithArguments(Unknown Source) ~[?:?]
at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45) ~[mockito-core-3.12.4.jar:?]
at org.mockito.Answers.answer(Answers.java:99) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82) ~[mockito-core-3.12.4.jar:?]
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151) ~[mockito-core-3.12.4.jar:?]
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[classes/:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalReceiveAsync$4(ConsumerImpl.java:487) ~[classes/:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, partitioned topics should pass without this fix.
But we should see the test assert failure from the non-partitioned topic, without this fix.
Please let me know if you see different behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heesung-sn Yes, the same behavior.
return null; | ||
}); | ||
} | ||
return consumer.receiveAsync().thenApply(msg -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this fix work? Your error message says msg
is not null but msg.getMessageId
is null.
Unless I'm reading this wrong, your change is a cleaner way but the effect remains the same. Baring very unexpected behaviors (for example, consumer.acknowledgeCumulativeAsync(msg)
throwing an exception, which before the change is silently disgarded.).
If msg.messageId()
somehow is null, you would have the same error, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, the modification is to return the CompetedFuture downstream from thenApply(msg -> {acknowledgeCumulativeAsync )
instead of receiveFuture
like MultiTopicsReade.readNextAsync()
The messageId can be null after message.release()
.
ConsumerBase consumer = (ConsumerBase) | ||
FieldUtils.readDeclaredField(reader, "consumer", true); | ||
ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>> | ||
pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>) | ||
FieldUtils.readField(consumer, "pendingReceives", true); | ||
CompletableFuture<Message<byte[]>> future = pendingReceives.peek(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break the reader.readNextAsync().cancel()
behavior because users will not do as this test does. I think the reason is the future instance in the pending receives is not the same one that was returned to the user.
I have tried this one, it should work for this case.
public CompletableFuture<Message<T>> readNextAsync() {
CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
consumer.acknowledgeCumulativeAsync(msg)
.exceptionally(ex -> {
log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
getConsumer().getSubscription(), msg.getMessageId(), ex);
return null;
});
return msg;
});
CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
handler.attachToFuture(result);
handler.setCancelAction(() -> originalFuture.cancel(false));
return result;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this seems to be the better approach if we want to keep supporting the ReaderImpl.readNextAsync().cancel()
behavior.
I see that the original future, multiTopicsConsumer.receiveAsync()
in MultiTopicsReaderImpl.readNextAsync()
is not cancellable now. Do we want to make the same change , CompletableFutureCancellationHandler handler
in MultiTopicsReaderImpl.readNextAsync()
too(is this another bug)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the current behavior of multiTopicsConsumer.receiveAsync().cancel()
is invalid. I also fixed this part and added a test to cover multiTopicsConsumer.receiveAsync().cancel()
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@heesung-sn Can you help open another PR to cherry-pick this to branch-2.10 ? there are some conflicts with direct cherry-picking. |
Raised a cherry-pick PR here. |
Hi @heesung-sn I'm not sure if we introduce two things.
|
yes, this PR introduce
Regarding
This has been already supported in the ReaderImpl so no addition for ReaderImpl. However, the |
apache#17728) (apache#17828) (cherry picked from commit bbccb62)
Fixes #
Master Issue: #
Motivation
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java#L190
I see ack failures due to null message id -- because we release the msg in TableViewImpl before reader.acknowledgeCumulativeAsync.
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java#L176
The root cause is that we return
receiveFuture
here instead ofreceiveFuture.whenComplete
.Modifications
Return the CompetedFuture from
.whenComplete(.. acknowledgeCumulativeAsync )
.Verifying this change
This change added tests and can be verified as follows:
acknowledgeCumulativeAsync()
that gets called after the messageId null check.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in forked repository
PR in forked repository: heesung-sn#9
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since the
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in
a forked repository.
The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->