Skip to content

Commit

Permalink
[improve][broker][PIP-149]Make GetMessageIdByTimestamp pure async (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy authored Jul 13, 2022
1 parent 706bf91 commit 5ea139f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2740,53 +2740,56 @@ public void readEntryComplete(Entry entry, Object ctx) {
});
}

protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) {
try {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}

if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}

validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

Topic topic = getTopicReference(topicName);
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
return future.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(metadata -> {
if (metadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}
});
}
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
}

ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
} finally {
entry.release();
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
} finally {
entry.release();
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
topicName.getPartitionIndex());
}
}
});
});
} catch (WebApplicationException exception) {
return FutureUtil.failedFuture(exception);
} catch (Exception exception) {
return FutureUtil.failedFuture(new RestException(exception));
}
}

protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1855,7 +1855,7 @@ public void getMessageIdByTimestamp(
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageIdByTimestamp(timestamp, authoritative)
internalGetMessageIdByTimestampAsync(timestamp, authoritative)
.thenAccept(messageId -> {
if (messageId == null) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Message ID not found"));
Expand All @@ -1864,8 +1864,10 @@ public void getMessageIdByTimestamp(
}
})
.exceptionally(ex -> {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
clientAppId(), timestamp, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down

0 comments on commit 5ea139f

Please sign in to comment.