Skip to content

Commit

Permalink
[fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadMana…
Browse files Browse the repository at this point in the history
…ger is enabled (apache#22496)

(cherry picked from commit 203f305)
  • Loading branch information
heesung-sn committed Jun 26, 2024
1 parent 7b32451 commit 15db04d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -1377,8 +1376,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
}

Expand All @@ -1401,8 +1400,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight system bundle override messages.", e);
}

Expand Down Expand Up @@ -1580,8 +1579,8 @@ protected void monitorOwnerships(List<String> brokers) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight messages.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
}

public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
}
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ public CompletableFuture<Void> stopReplProducers() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,7 +1817,8 @@ CompletableFuture<Void> checkPersistencePolicies() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand All @@ -38,6 +40,8 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.lang.reflect.Method;
Expand All @@ -52,6 +56,18 @@
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {

protected String methodName;
@DataProvider(name = "loadManagerClassName")
public static Object[][] loadManagerClassName() {
return new Object[][]{
{ModularLoadManagerImpl.class.getName()},
{ExtensibleLoadManagerImpl.class.getName()}
};
}

@Factory(dataProvider = "loadManagerClassName")
public ReplicatorGlobalNSTest(String loadManagerClassName) {
this.loadManagerClassName = loadManagerClassName;
}

@BeforeMethod
public void beforeMethod(Method m) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
protected final String cluster2 = "r2";
protected final String cluster3 = "r3";
protected final String cluster4 = "r4";
protected String loadManagerClassName;

protected String getLoadManagerClassName() {
return loadManagerClassName;
}

// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
Expand Down Expand Up @@ -271,8 +276,9 @@ protected void setup() throws Exception {
.brokerClientTlsTrustStoreType(keyStoreType)
.build());

admin1.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
updateTenantInfo("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", "r2", "r3")));
admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));

Expand Down Expand Up @@ -344,6 +350,7 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadManagerClassName(getLoadManagerClassName());
}

public void resetConfig1() {
Expand Down Expand Up @@ -436,6 +443,14 @@ protected void cleanup() throws Exception {
resetConfig4();
}

protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception {
if (!admin1.tenants().getTenants().contains(tenant)) {
admin1.tenants().createTenant(tenant, tenantInfo);
} else {
admin1.tenants().updateTenant(tenant, tenantInfo);
}
}

static class MessageProducer implements AutoCloseable {
URL url;
String namespace;
Expand Down

0 comments on commit 15db04d

Please sign in to comment.