Skip to content

Commit

Permalink
[fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscri…
Browse files Browse the repository at this point in the history
…ptions caused by wrong topicName (apache#21997)

Similar to: apache#20131

The master branch has fixed the issue by apache#19841 Since it will makes users can not receive the messages which created in mistake, we did not cherry-pick apache#19841 into other branches, see detail apache#19841)

It works like this:
1. createSubscription( `tp1` )
2. is partitioned topic?
  `no`: return subscriptions
  `yes`: createSubscription(`tp1-partition-0`)....createSubscription(`tp1-partition-n`)

---

```java
String partitionedTopic = "tp1-partition-0-DLQ";

TopicName partition0 = partitionedTopic.getPartition(0);// Highlight: the partition0.toString() will be "tp1-partition-0-DLQ"(it is wrong).The correct value is "tp1-partition-0-DLQ-partition-0"
```

Therefore, if there has a partitioned topic named `tp1-partition-0-DLQ`, the method `PersistentTopics.createSubscription` will works like this:
1. call Admin API ``PersistentTopics.createSubscription("tp1-partition-0-DLQ")`
2. is partitioned topic?
3. yes, call `TopicName.getPartition(0)` to get partition 0 and will get `tp1-partition-0-DLQ` , then loop to step-1.

Then the infinite HTTP call `PersistentTopics.createSubscription` makes the broker crash.

If hits the issue which makes the topic name wrong, do not loop to step 1.

The PR apache#19841 fixes the issue which makes the topic name wrong, and this PR will create unfriendly compatibility, and PIP 263 apache#20033 will make compatibility good.

(cherry picked from commit 4386401)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 17, 2024
1 parent 8941207 commit 2cea505
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2306,7 +2306,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
.thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerConsumerBase {

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setDefaultNumPartitions(1);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr;
final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
final String subscriptionName = "sub1";
final String topicDLQ = topic_p0 + "-" + subscriptionName + "-DLQ";

admin.topics().createPartitionedTopic(partitionedTopicName, 2);

// Do test.
ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName);
admin.topics().getSubscriptions(topicDLQ);
admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest);

// cleanup.
pcEntry.consumer.close();
pcEntry.producer.close();
admin.topics().deletePartitionedTopic(partitionedTopicName);
}

@Test
public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();

// Do test.
admin.topics().getSubscriptions(topicName);
admin.topics().createSubscription(topicName, "s1", MessageId.earliest);

// cleanup.
producer.close();
}

@Test
public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();

// Do test.
admin.topics().getSubscriptions(topicName);
admin.topics().createSubscription(topicName, "s1", MessageId.earliest);

// cleanup.
producer.close();
}

@AllArgsConstructor
private static class ProducerAndConsumerEntry {
private Producer<String> producer;
private Consumer<String> consumer;
}

private ProducerAndConsumerEntry triggerDLQCreated(String topicName, String DLQName, String subscriptionName) throws Exception {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(DLQName).maxRedeliverCount(2).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();
// send messages.
for (int i = 0; i < 5; i++) {
producer.newMessage()
.value("value-" + i)
.sendAsync();
}
producer.flush();
// trigger the DLQ created.
for (int i = 0; i < 20; i++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
} else {
break;
}
}

return new ProducerAndConsumerEntry(producer, consumer);
}
}

0 comments on commit 2cea505

Please sign in to comment.