diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 3a77984516838..1b05df826c9a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -162,6 +162,21 @@ protected CompletableFuture tryCreatePartitionsAsync(int numPartitions) { return FutureUtil.waitForAll(futures); } + protected CompletableFuture tryCreateExtendedPartitionsAsync(int oldNumPartitions, int numPartitions) { + if (!topicName.isPersistent()) { + return CompletableFuture.completedFuture(null); + } + if (numPartitions <= oldNumPartitions) { + return CompletableFuture.failedFuture(new RestException(Status.NOT_ACCEPTABLE, + "Number of new partitions must be greater than existing number of partitions")); + } + List> futures = new ArrayList<>(numPartitions - oldNumPartitions); + for (int i = oldNumPartitions; i < numPartitions; i++) { + futures.add(tryCreatePartitionAsync(i)); + } + return FutureUtil.waitForAll(futures); + } + private CompletableFuture tryCreatePartitionAsync(final int partition) { CompletableFuture result = new CompletableFuture<>(); getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 274a2adf3a496..b477a3dc1d655 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -464,8 +464,8 @@ protected CompletableFuture internalUpdatePartitionedTopicAsync(int numPar } else { return CompletableFuture.completedFuture(null); } - }) - .thenCompose(__ -> { + }).thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)) + .thenCompose(topicMetadata -> { final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); if (maxPartitions > 0 && numPartitions > maxPartitions) { throw new RestException(Status.NOT_ACCEPTABLE, @@ -483,8 +483,9 @@ protected CompletableFuture internalUpdatePartitionedTopicAsync(int numPar } return clusters; }) - .thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore -> - clusters)) + .thenCompose(clusters -> + tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions) + .thenApply(ignore -> clusters)) .thenCompose(clusters -> createSubscriptions(topicName, numPartitions, force).thenApply( ignore -> clusters)) .thenCompose(clusters -> { @@ -500,7 +501,7 @@ protected CompletableFuture internalUpdatePartitionedTopicAsync(int numPar } }); } else { - return tryCreatePartitionsAsync(numPartitions) + return tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions) .thenCompose(ignore -> updatePartitionedTopic(topicName, numPartitions, force)); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index df32386152ac6..8e3f3adbfeeb3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -273,7 +273,8 @@ public void createNonPartitionedTopic( @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0" - + " and less than or equal to maxNumPartitionsPerPartitionedTopic"), + + " and less than or equal to maxNumPartitionsPerPartitionedTopic" + + " and number of new partitions must be greater than existing number of partitions"), @ApiResponse(code = 409, message = "Partitioned topic does not exist")}) public void updatePartitionedTopic( @Suspended final AsyncResponse asyncResponse, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index f283bb7aa5c28..742f27161f63f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -832,7 +832,8 @@ public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRe @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and" - + " less than or equal to maxNumPartitionsPerPartitionedTopic"), + + " less than or equal to maxNumPartitionsPerPartitionedTopic" + + " and number of new partitions must be greater than existing number of partitions"), @ApiResponse(code = 409, message = "Partitioned topic does not exist"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 29d81d609b49e..e4097b68e880a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1600,5 +1600,21 @@ public void testUpdatePartitionedTopic() verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); partitionedTopicMetadata = metaCaptor.getValue(); Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + + // check number of new partitions must be greater than existing number of partitions + response = mock(AsyncResponse.class); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true, + true, 3); + verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture()); + Assert.assertEquals(throwableCaptor.getValue().getMessage(), + "Number of new partitions must be greater than existing number of partitions"); + + response = mock(AsyncResponse.class); + metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); + persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); + verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); + partitionedTopicMetadata = metaCaptor.getValue(); + Assert.assertEquals(partitionedTopicMetadata.partitions, 4); } }