From 5f09ec79f43807ffaa145c87fb8b80c238ee76b3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 13:48:46 +0800 Subject: [PATCH 1/7] [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest ### Motivation Java client has the same issue with https://github.com/apache/pulsar-client-python/issues/199 After a seek operation is done, the `startMessageId` will be updated until the reconnection due to the seek is done in `connectionOpened`. So before it's updated, `hasMessageAvailable` could compare with an outdated `startMessageId` and return a wrong value. ### Modifications Replace `duringSeek` with a `SeekStatus` field: - `NOT_STARTED`: initial, or a seek operation is done. `seek` could only succeed in this status. - `IN_PROGRESS`: A seek operation has started but the client does not receive the response from broker. - `COMPLETED`: The client has received the seek response but the seek future is not done. After the status becomes `COMPLETED`, if the connection is not ready, next time the connection is established, the status will change from `COMPLETED` to `NOT_STARTED` and then seek future will be completed in the internal executor. Add `testHasMessageAvailableAfterSeek` to cover this change. --- .../apache/pulsar/client/impl/ReaderTest.java | 24 +++++ .../pulsar/client/impl/ConsumerImpl.java | 102 +++++++++++------- .../pulsar/client/impl/ConsumerImplTest.java | 8 +- 3 files changed, 95 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 4e4dc8273d3f0..e28081c1a86b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -68,6 +68,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -813,4 +814,27 @@ public void testReaderReconnectedFromNextEntry() throws Exception { producer.close(); admin.topics().delete(topic, false); } + + @DataProvider + public static Object[][] initializeLastMessageIdInBroker() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "initializeLastMessageIdInBroker") + public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception { + final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek"; + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageId(MessageId.earliest).create(); + + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + producer.send("msg"); + + if (initializeLastMessageIdInBroker) { + assertTrue(reader.hasMessageAvailable()); + } // else: lastMessageIdInBroker is earliest + + reader.seek(MessageId.latest); + // lastMessageIdInBroker is the last message ID, while startMessageId is still earliest + assertFalse(reader.hasMessageAvailable()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5619837757363..672972b04cabc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -167,7 +167,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile MessageIdAdv startMessageId; private volatile MessageIdAdv seekMessageId; - private final AtomicBoolean duringSeek; + @VisibleForTesting + final AtomicReference seekStatus; + private volatile CompletableFuture seekFuture; private final MessageIdAdv initialStartMessageId; @@ -304,7 +306,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat stats = ConsumerStatsDisabled.INSTANCE; } - duringSeek = new AtomicBoolean(false); + seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED); // Create msgCrypto if not created already if (conf.getCryptoKeyReader() != null) { @@ -781,7 +783,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); - clearReceiverQueue(); + clearReceiverQueue(false); return CompletableFuture.completedFuture(null); } @@ -789,7 +791,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { topic, subscription, cnx.ctx().channel(), consumerId); long requestId = client.newRequestId(); - if (duringSeek.get()) { + if (!SeekStatus.NOT_STARTED.equals(seekStatus.get())) { acknowledgmentsGroupingTracker.flushAndClean(); } @@ -800,7 +802,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { int currentSize; synchronized (this) { currentSize = incomingMessages.size(); - startMessageId = clearReceiverQueue(); + setClientCnx(cnx); + clearReceiverQueue(true); if (possibleSendToDeadLetterTopicMessages != null) { possibleSendToDeadLetterTopicMessages.clear(); } @@ -838,7 +841,6 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them final CompletableFuture future = new CompletableFuture<>(); synchronized (this) { - setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), @@ -943,15 +945,23 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application. */ - private MessageIdAdv clearReceiverQueue() { + private void clearReceiverQueue(boolean updateStartMessageId) { List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); resetIncomingMessageSize(); - if (duringSeek.compareAndSet(true, false)) { - return seekMessageId; - } else if (subscriptionMode == SubscriptionMode.Durable) { - return startMessageId; + CompletableFuture seekFuture = this.seekFuture; + MessageIdAdv seekMessageId = this.seekMessageId; + + if (updateStartMessageId && seekStatus.get() != SeekStatus.NOT_STARTED) { + startMessageId = seekMessageId; + } + if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) { + internalPinnedExecutor.execute(() -> seekFuture.complete(null)); + return; + } + if (subscriptionMode == SubscriptionMode.Durable) { + return; } if (!currentMessageQueue.isEmpty()) { @@ -968,15 +978,14 @@ private MessageIdAdv clearReceiverQueue() { } // release messages if they are pooled messages currentMessageQueue.forEach(Message::release); - return previousMessage; - } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { + if (updateStartMessageId) { + startMessageId = previousMessage; + } + } else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); - } else { - // No message was received or dequeued by this consumer. Next message would still be the startMessageId - return startMessageId; - } + startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); + } // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId } /** @@ -2249,25 +2258,23 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); - CompletableFuture seekFuture = new CompletableFuture<>(); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture); + if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) { + final String message = String.format( + "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", + topic, subscription, seekBy); + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + return FutureUtil.failedFuture(new IllegalStateException(message)); + } + seekFuture = new CompletableFuture<>(); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs); return seekFuture; } private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, - final Backoff backoff, final AtomicLong remainingTime, - CompletableFuture seekFuture) { + final Backoff backoff, final AtomicLong remainingTime) { ClientCnx cnx = cnx(); if (isConnected() && cnx != null) { - if (!duringSeek.compareAndSet(false, true)) { - final String message = String.format( - "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", - topic, subscription, seekBy); - log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", - topic, subscription, seekBy); - seekFuture.completeExceptionally(new IllegalStateException(message)); - return; - } MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); @@ -2279,14 +2286,22 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S lastDequeuedMessageId = MessageId.earliest; clearIncomingMessages(); - seekFuture.complete(null); + synchronized (this) { + if (cnx() == null) { + // It's during reconnection, complete the seek future after connection is established + seekStatus.set(SeekStatus.COMPLETED); + } else { + final CompletableFuture future = seekFuture; + startMessageId = seekMessageId; + seekStatus.set(SeekStatus.NOT_STARTED); + future.complete(null); + } + } }).exceptionally(e -> { - // re-set duringSeek and seekMessageId if seek failed seekMessageId = originSeekMessageId; - duringSeek.set(false); log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); - seekFuture.completeExceptionally( + failSeek( PulsarClientException.wrap(e.getCause(), String.format("Failed to seek the subscription %s of the topic %s to %s", subscription, topicName.toString(), seekBy))); @@ -2295,7 +2310,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S } else { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { - seekFuture.completeExceptionally( + failSeek( new PulsarClientException.TimeoutException( String.format("The subscription %s of the topic %s could not seek " + "withing configured timeout", subscription, topicName.toString()))); @@ -2306,11 +2321,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } + private void failSeek(Throwable throwable) { + CompletableFuture seekFuture = this.seekFuture; + if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) { + seekFuture.completeExceptionally(throwable); + } + } + @Override public CompletableFuture seekAsync(long timestamp) { String seekBy = String.format("the timestamp %d", timestamp); @@ -2968,4 +2990,10 @@ boolean isAckReceiptEnabled() { private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); + @VisibleForTesting + enum SeekStatus { + NOT_STARTED, + IN_PROGRESS, + COMPLETED + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 070919c57a420..ae622cb7e3b0c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -29,6 +29,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; @@ -283,6 +285,7 @@ public void testSeekAsyncInternal() { consumer.setClientCnx(cnx); consumer.setState(HandlerState.State.Ready); + consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED); // when CompletableFuture firstResult = consumer.seekAsync(1L); @@ -290,8 +293,9 @@ public void testSeekAsyncInternal() { clientReq.complete(null); - // then - assertTrue(firstResult.isDone()); + // The seek future will be completed in connectionOpened after receiving the seek response + assertFalse(firstResult.isDone()); + assertEquals(consumer.seekStatus.get(), ConsumerImpl.SeekStatus.COMPLETED); assertTrue(secondResult.isCompletedExceptionally()); verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); } From 1a491188a78e689f639e45103ba88d1727fee95f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 15:45:18 +0800 Subject: [PATCH 2/7] Move future.complete() out of the synchronized block --- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 672972b04cabc..9224042834062 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2286,17 +2286,20 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S lastDequeuedMessageId = MessageId.earliest; clearIncomingMessages(); + CompletableFuture future = null; synchronized (this) { if (cnx() == null) { // It's during reconnection, complete the seek future after connection is established seekStatus.set(SeekStatus.COMPLETED); } else { - final CompletableFuture future = seekFuture; + future = seekFuture; startMessageId = seekMessageId; seekStatus.set(SeekStatus.NOT_STARTED); - future.complete(null); } } + if (future != null) { + future.complete(null); + } }).exceptionally(e -> { seekMessageId = originSeekMessageId; log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); From 4ba37b9cc4725f2929cd3d64373022d999b0da3c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 15:47:12 +0800 Subject: [PATCH 3/7] Use != to compare enum --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9224042834062..f9907204bc674 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -791,7 +791,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { topic, subscription, cnx.ctx().channel(), consumerId); long requestId = client.newRequestId(); - if (!SeekStatus.NOT_STARTED.equals(seekStatus.get())) { + if (seekStatus.get() != SeekStatus.NOT_STARTED) { acknowledgmentsGroupingTracker.flushAndClean(); } From e06278954480c699a97203140e20ca85fce0c308 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 16:24:36 +0800 Subject: [PATCH 4/7] Fix clearReceiverQueue() might not return when seekStatus is IN_PROGRESS --- .../apache/pulsar/client/impl/ConsumerImpl.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f9907204bc674..a4bcd8521755b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -953,14 +953,15 @@ private void clearReceiverQueue(boolean updateStartMessageId) { CompletableFuture seekFuture = this.seekFuture; MessageIdAdv seekMessageId = this.seekMessageId; - if (updateStartMessageId && seekStatus.get() != SeekStatus.NOT_STARTED) { - startMessageId = seekMessageId; - } - if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) { - internalPinnedExecutor.execute(() -> seekFuture.complete(null)); + if (seekStatus.get() != SeekStatus.NOT_STARTED) { + if (updateStartMessageId) { + startMessageId = seekMessageId; + } + if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) { + internalPinnedExecutor.execute(() -> seekFuture.complete(null)); + } return; - } - if (subscriptionMode == SubscriptionMode.Durable) { + } else if (subscriptionMode == SubscriptionMode.Durable) { return; } From fa4c9d7cfe6dedbcf0e0628b26318becd474776f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 16:59:42 +0800 Subject: [PATCH 5/7] Fix tests --- .../java/org/apache/pulsar/client/impl/ConsumerImplTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index ae622cb7e3b0c..71c5602f94abb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -293,9 +293,7 @@ public void testSeekAsyncInternal() { clientReq.complete(null); - // The seek future will be completed in connectionOpened after receiving the seek response - assertFalse(firstResult.isDone()); - assertEquals(consumer.seekStatus.get(), ConsumerImpl.SeekStatus.COMPLETED); + assertTrue(firstResult.isDone()); assertTrue(secondResult.isCompletedExceptionally()); verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); } From 9b65f62e6138e2445784d496faab3794446c6736 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 17:26:24 +0800 Subject: [PATCH 6/7] Fix checkstyle --- .../java/org/apache/pulsar/client/impl/ConsumerImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 71c5602f94abb..9995246c175e1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -29,8 +29,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; From 901c6a9d671b6d501ff5e297db9b0b2ae7aedb2c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 5 Mar 2024 22:09:46 +0800 Subject: [PATCH 7/7] Fix testSeekCustomTopicMessageId --- .../test/java/org/apache/pulsar/client/impl/ReaderTest.java | 3 +++ .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index e28081c1a86b5..cee3ea09968dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -836,5 +836,8 @@ public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBr reader.seek(MessageId.latest); // lastMessageIdInBroker is the last message ID, while startMessageId is still earliest assertFalse(reader.hasMessageAvailable()); + + producer.send("msg"); + assertTrue(reader.hasMessageAvailable()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a4bcd8521755b..c09e0afe58d4b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2289,7 +2289,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S clearIncomingMessages(); CompletableFuture future = null; synchronized (this) { - if (cnx() == null) { + if (!hasParentConsumer && cnx() == null) { // It's during reconnection, complete the seek future after connection is established seekStatus.set(SeekStatus.COMPLETED); } else {