Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Jul 11, 2022
1 parent 87fdc8a commit 20cf345
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2748,12 +2748,18 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long
future = CompletableFuture.completedFuture(null);
}

return future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(metadata -> {
if (!topicName.isPartitioned() && metadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
return future.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(metadata -> {
if (metadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a partitioned topic is not allowed, "
+ "please try do it on specific topic partition");
}
});
}
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
Expand Down

0 comments on commit 20cf345

Please sign in to comment.