Skip to content

Commit

Permalink
[improve][broker]Only create extended partitions when updating partit…
Browse files Browse the repository at this point in the history
…ion number (#17349)
  • Loading branch information
AnonHxy authored Sep 13, 2022
1 parent 7b32013 commit 16a39e6
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
return FutureUtil.waitForAll(futures);
}

protected CompletableFuture<Void> tryCreateExtendedPartitionsAsync(int oldNumPartitions, int numPartitions) {
if (!topicName.isPersistent()) {
return CompletableFuture.completedFuture(null);
}
if (numPartitions <= oldNumPartitions) {
return CompletableFuture.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
"Number of new partitions must be greater than existing number of partitions"));
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions - oldNumPartitions);
for (int i = oldNumPartitions; i < numPartitions; i++) {
futures.add(tryCreatePartitionAsync(i));
}
return FutureUtil.waitForAll(futures);
}

private CompletableFuture<Void> tryCreatePartitionAsync(final int partition) {
CompletableFuture<Void> result = new CompletableFuture<>();
getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPar
} else {
return CompletableFuture.completedFuture(null);
}
})
.thenCompose(__ -> {
}).thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(topicMetadata -> {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
Expand All @@ -483,8 +483,9 @@ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPar
}
return clusters;
})
.thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
clusters))
.thenCompose(clusters ->
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
.thenApply(ignore -> clusters))
.thenCompose(clusters -> createSubscriptions(topicName, numPartitions, force).thenApply(
ignore -> clusters))
.thenCompose(clusters -> {
Expand All @@ -500,7 +501,7 @@ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPar
}
});
} else {
return tryCreatePartitionsAsync(numPartitions)
return tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
.thenCompose(ignore -> updatePartitionedTopic(topicName, numPartitions, force));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"
+ " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
+ " and less than or equal to maxNumPartitionsPerPartitionedTopic"
+ " and number of new partitions must be greater than existing number of partitions"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist")})
public void updatePartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,8 @@ public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRe
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and"
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"),
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"
+ " and number of new partitions must be greater than existing number of partitions"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,5 +1600,21 @@ public void testUpdatePartitionedTopic()
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);

// check number of new partitions must be greater than existing number of partitions
response = mock(AsyncResponse.class);
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
true, 3);
verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture());
Assert.assertEquals(throwableCaptor.getValue().getMessage(),
"Number of new partitions must be greater than existing number of partitions");

response = mock(AsyncResponse.class);
metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
}
}

0 comments on commit 16a39e6

Please sign in to comment.