From 5ea139f3f16d9aae978b76e0e23f9878f5b846ff Mon Sep 17 00:00:00 2001 From: Xiaoyu Hou Date: Wed, 13 Jul 2022 11:05:18 +0800 Subject: [PATCH] [improve][broker][PIP-149]Make GetMessageIdByTimestamp pure async (#16446) --- .../admin/impl/PersistentTopicsBase.java | 87 ++++++++++--------- .../broker/admin/v2/PersistentTopics.java | 6 +- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 908874843ace0..d39b13412d2df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2740,53 +2740,56 @@ public void readEntryComplete(Entry entry, Object ctx) { }); } - protected CompletableFuture internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) { - try { - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - - if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, - authoritative, false).partitions > 0) { - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Get message ID by timestamp on a partitioned topic is not allowed, " - + "please try do it on specific topic partition"); - } - - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES); + protected CompletableFuture internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) { + CompletableFuture future; + if (topicName.isGlobal()) { + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); + } - Topic topic = getTopicReference(topicName); - if (!(topic instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName); - throw new RestException(Status.METHOD_NOT_ALLOWED, + return future.thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(metadata -> { + if (metadata.partitions > 0) { + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Get message ID by timestamp on a partitioned topic is not allowed, " + + "please try do it on specific topic partition"); + } + }); + } + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic)) { + log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, "Get message ID by timestamp on a non-persistent topic is not allowed"); - } - - ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger(); - return ledger.asyncFindPosition(entry -> { - try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); - } catch (Exception e) { - log.error("[{}] Error deserializing message for message position find", topicName, e); - } finally { - entry.release(); } - return false; - }).thenApply(position -> { - if (position == null) { - return null; - } else { - return new MessageIdImpl(position.getLedgerId(), position.getEntryId(), + ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger(); + return ledger.asyncFindPosition(entry -> { + try { + long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); + } catch (Exception e) { + log.error("[{}] Error deserializing message for message position find", topicName, e); + } finally { + entry.release(); + } + return false; + }).thenApply(position -> { + if (position == null) { + return null; + } else { + return new MessageIdImpl(position.getLedgerId(), position.getEntryId(), topicName.getPartitionIndex()); - } + } + }); }); - } catch (WebApplicationException exception) { - return FutureUtil.failedFuture(exception); - } catch (Exception exception) { - return FutureUtil.failedFuture(new RestException(exception)); - } } protected CompletableFuture internalPeekNthMessageAsync(String subName, int messagePosition, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index cf3f4e7dc7589..fbd9f008fd864 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1855,7 +1855,7 @@ public void getMessageIdByTimestamp( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - internalGetMessageIdByTimestamp(timestamp, authoritative) + internalGetMessageIdByTimestampAsync(timestamp, authoritative) .thenAccept(messageId -> { if (messageId == null) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Message ID not found")); @@ -1864,8 +1864,10 @@ public void getMessageIdByTimestamp( } }) .exceptionally(ex -> { - log.error("[{}] Failed to get message ID by timestamp {} from {}", + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; });