From 703dbce1c9f4bbb1a0994e73aefdfd0b420b9675 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 28 Feb 2024 13:07:06 -0800 Subject: [PATCH 1/6] [fix][broker] Fix broker not starting when both transactions and the Extensible Load Manager are enabled (#22139) --- .../persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopic.java | 3 +- .../ExtensibleLoadManagerImplBaseTest.java | 162 ++++++++++++++++++ .../ExtensibleLoadManagerImplTest.java | 129 +------------- ...gerImplWithTransactionCoordinatorTest.java | 55 ++++++ .../messaging/MessagingSmokeTest.java | 107 ++++++++++++ .../integration/topologies/PulsarCluster.java | 2 +- .../topologies/PulsarClusterTestBase.java | 15 ++ .../src/test/resources/pulsar-messaging.xml | 1 + 9 files changed, 354 insertions(+), 124 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 42db3dad175c1..949dc398b32fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -58,6 +58,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -158,7 +159,8 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(TopicName.get(topicName))) { + && !isEventSystemTopic(TopicName.get(topicName)) + && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) { this.pendingAckHandle = new PendingAckHandleImpl(this); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 47e314bb3198b..789c828b3d723 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -328,7 +328,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() && !isEventSystemTopic(topicName) - && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) + && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java new file mode 100644 index 0000000000000..ab6c0b24551fc --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest { + + protected PulsarService pulsar1; + protected PulsarService pulsar2; + + protected PulsarTestContext additionalPulsarTestContext; + + protected ExtensibleLoadManagerImpl primaryLoadManager; + + protected ExtensibleLoadManagerImpl secondaryLoadManager; + + protected ServiceUnitStateChannelImpl channel1; + protected ServiceUnitStateChannelImpl channel2; + + protected final String defaultTestNamespace; + + protected LookupService lookupService; + + protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) { + this.defaultTestNamespace = defaultTestNamespace; + } + + protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + conf.setForceDeleteNamespaceAllowed(true); + conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + conf.setAllowAutoTopicCreation(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadBalancerSheddingEnabled(false); + conf.setLoadBalancerDebugModeEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + return conf; + } + + @Override + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + initConfig(conf); + super.internalSetup(conf); + pulsar1 = pulsar; + var conf2 = initConfig(getDefaultConf()); + additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2); + pulsar2 = additionalPulsarTestContext.getPulsarService(); + + setPrimaryLoadManager(); + setSecondaryLoadManager(); + + admin.clusters().createCluster(this.conf.getClusterName(), + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), + Sets.newHashSet(this.conf.getClusterName()))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", + Sets.newHashSet(this.conf.getClusterName())); + + admin.namespaces().createNamespace(defaultTestNamespace, 128); + admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, + Sets.newHashSet(this.conf.getClusterName())); + lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + this.additionalPulsarTestContext.close(); + super.internalCleanup(); + } + + @BeforeMethod(alwaysRun = true) + protected void initializeState() throws PulsarAdminException, IllegalAccessException { + admin.namespaces().unload(defaultTestNamespace); + reset(primaryLoadManager, secondaryLoadManager); + FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); + pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + } + + protected void setPrimaryLoadManager() throws IllegalAccessException { + ExtensibleLoadManagerWrapper wrapper = + (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); + primaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true); + channel1 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); + } + + private void setSecondaryLoadManager() throws IllegalAccessException { + ExtensibleLoadManagerWrapper wrapper = + (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); + secondaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true); + channel2 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); + } + + protected CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + return pulsar.getNamespaceService().getBundleAsync(topic); + } + + protected Pair getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix) + throws Exception { + TopicName changeEventsTopicName = + TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); + int i = 0; + while(true) { + TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + if (!bundle.equals(changeEventsBundle)) { + return Pair.of(topicName, bundle); + } + i++; + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 43b84a5426f3c..c78ebd7011426 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -56,7 +56,11 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.net.URL; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -67,15 +71,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.Optional; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; @@ -100,21 +99,16 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; -import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -122,9 +116,6 @@ import org.awaitility.Awaitility; import org.mockito.MockedStatic; import org.testng.AssertJUnit; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** @@ -132,75 +123,11 @@ */ @Slf4j @Test(groups = "broker") -public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { - - private PulsarService pulsar1; - private PulsarService pulsar2; - - private PulsarTestContext additionalPulsarTestContext; - - private ExtensibleLoadManagerImpl primaryLoadManager; - - private ExtensibleLoadManagerImpl secondaryLoadManager; - - private ServiceUnitStateChannelImpl channel1; - private ServiceUnitStateChannelImpl channel2; - - private final String defaultTestNamespace = "public/test"; - - private static void initConfig(ServiceConfiguration conf){ - conf.setForceDeleteNamespaceAllowed(true); - conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - conf.setAllowAutoTopicCreation(true); - conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - conf.setLoadBalancerSheddingEnabled(false); - conf.setLoadBalancerDebugModeEnabled(true); - conf.setTopicLevelPoliciesEnabled(true); - } - - @BeforeClass - @Override - public void setup() throws Exception { - // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid - // stuck when doing unload. - initConfig(conf); - super.internalSetup(conf); - pulsar1 = pulsar; - ServiceConfiguration defaultConf = getDefaultConf(); - initConfig(defaultConf); - additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); - pulsar2 = additionalPulsarTestContext.getPulsarService(); - - setPrimaryLoadManager(); +@SuppressWarnings("unchecked") +public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest { - setSecondaryLoadManager(); - - admin.clusters().createCluster(this.conf.getClusterName(), - ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), - Sets.newHashSet(this.conf.getClusterName()))); - admin.namespaces().createNamespace("public/default"); - admin.namespaces().setNamespaceReplicationClusters("public/default", - Sets.newHashSet(this.conf.getClusterName())); - - admin.namespaces().createNamespace(defaultTestNamespace); - admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, - Sets.newHashSet(this.conf.getClusterName())); - } - - @Override - @AfterClass(alwaysRun = true) - protected void cleanup() throws Exception { - this.additionalPulsarTestContext.close(); - super.internalCleanup(); - } - - @BeforeMethod(alwaysRun = true) - protected void initializeState() throws PulsarAdminException { - admin.namespaces().unload(defaultTestNamespace); - reset(primaryLoadManager, secondaryLoadManager); + public ExtensibleLoadManagerImplTest() { + super("public/test"); } @Test @@ -1301,44 +1228,4 @@ public String name() { } } - - private void setPrimaryLoadManager() throws IllegalAccessException { - ExtensibleLoadManagerWrapper wrapper = - (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); - primaryLoadManager = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true); - channel1 = (ServiceUnitStateChannelImpl) - FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); - } - - private void setSecondaryLoadManager() throws IllegalAccessException { - ExtensibleLoadManagerWrapper wrapper = - (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); - secondaryLoadManager = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true); - channel2 = (ServiceUnitStateChannelImpl) - FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); - } - - private CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { - return pulsar.getNamespaceService().getBundleAsync(topic); - } - - private Pair getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix) - throws Exception { - TopicName changeEventsTopicName = - TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); - NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); - int i = 0; - while (true) { - TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); - NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); - if (!bundle.equals(changeEventsBundle)) { - return Pair.of(topicName, bundle); - } - i++; - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java new file mode 100644 index 0000000000000..0c95dd85f28e0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.testng.Assert.assertEquals; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends ExtensibleLoadManagerImplBaseTest { + + public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() { + super("public/test-elb-with-tx"); + } + + @Override + protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + conf = super.initConfig(conf); + conf.setTransactionCoordinatorEnabled(true); + return conf; + } + + @Test(timeOut = 30 * 1000) + public void testUnloadAdminAPI() throws Exception { + var topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload"); + var topicName = topicAndBundle.getLeft(); + var bundle = topicAndBundle.getRight(); + + var srcBroker = admin.lookups().lookupTopic(topicName.toString()); + var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1; + var dstBrokerUrl = dstBroker.getBrokerId(); + var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl(); + + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl)); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java new file mode 100644 index 0000000000000..618053ac000e2 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.messaging; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.common.naming.TopicDomain; +import org.testng.ITest; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +public class MessagingSmokeTest extends TopicMessagingBase implements ITest { + + @Factory + public static Object[] messagingTests() { + List tests = List.of( + new MessagingSmokeTest("Extensible Load Manager", + Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(), + "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName())), + new MessagingSmokeTest("Extensible Load Manager with TX Coordinator", + Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(), + "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName(), + "transactionCoordinatorEnabled", "true")) + ); + return tests.toArray(); + } + + private final String name; + + public MessagingSmokeTest(String name, Map brokerEnvs) { + super(); + this.brokerEnvs.putAll(brokerEnvs); + this.name = name; + } + + @Override + public String getTestName() { + return name; + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index c648f6968e24c..e5460247c74d0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -133,7 +133,7 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s this.brokerContainers = Maps.newTreeMap(); this.workerContainers = Maps.newTreeMap(); - this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls) + this.proxyContainer = new ProxyContainer(clusterName, appendClusterName(ProxyContainer.NAME), spec.enableTls) .withNetwork(network) .withNetworkAliases(appendClusterName("pulsar-proxy")) .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java index ae9e44fa98254..93e2221ab2493 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.naming.TopicDomain; import org.testng.annotations.DataProvider; import java.util.stream.Stream; @@ -86,6 +87,20 @@ public Object[][] serviceAndAdminUrls() { }; } + @DataProvider + public Object[][] serviceUrlAndTopicDomain() { + return new Object[][] { + { + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), + TopicDomain.persistent + }, + { + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), + TopicDomain.non_persistent + }, + }; + } + protected PulsarAdmin pulsarAdmin; protected PulsarCluster pulsarCluster; diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index cfbdb22587034..c6cd900d79151 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -28,6 +28,7 @@ + From 48d6ba2a73e50834eb591255feeaf4ad0e358115 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 29 Feb 2024 17:41:49 -0800 Subject: [PATCH 2/6] Fix test build --- .../extensions/ExtensibleLoadManagerImplBaseTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index ab6c0b24551fc..d9c6f78b8d01f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -116,8 +116,6 @@ protected void initializeState() throws PulsarAdminException, IllegalAccessExcep admin.namespaces().unload(defaultTestNamespace); reset(primaryLoadManager, secondaryLoadManager); FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); - pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); - pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); } protected void setPrimaryLoadManager() throws IllegalAccessException { From eeb65022a5339f3b38499db12dbe1a41553cc18c Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 29 Feb 2024 18:03:08 -0800 Subject: [PATCH 3/6] Fix build --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 789c828b3d723..f2feb8a8b4ecc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -83,6 +83,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; From 1850bdf273261e3233c12881432bd501bcfe605c Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 29 Feb 2024 18:25:24 -0800 Subject: [PATCH 4/6] Remove redundant field --- .../extensions/ExtensibleLoadManagerImplBaseTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index d9c6f78b8d01f..d702969eda546 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -58,8 +58,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ protected final String defaultTestNamespace; - protected LookupService lookupService; - protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) { this.defaultTestNamespace = defaultTestNamespace; } @@ -101,7 +99,6 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(defaultTestNamespace, 128); admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, Sets.newHashSet(this.conf.getClusterName())); - lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); } @Override @@ -115,7 +112,6 @@ protected void cleanup() throws Exception { protected void initializeState() throws PulsarAdminException, IllegalAccessException { admin.namespaces().unload(defaultTestNamespace); reset(primaryLoadManager, secondaryLoadManager); - FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); } protected void setPrimaryLoadManager() throws IllegalAccessException { @@ -148,7 +144,7 @@ protected Pair getBundleIsNotOwnByChangeEventTopic(S TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); int i = 0; - while(true) { + while (true) { TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); if (!bundle.equals(changeEventsBundle)) { From 8f5acef6c5e119ba7a325dd2a82b45d510a712f5 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Mon, 4 Mar 2024 10:51:24 -0800 Subject: [PATCH 5/6] Remove numBundles parameter in test call to createNamespace --- .../extensions/ExtensibleLoadManagerImplBaseTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index d702969eda546..250cec8c8a590 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -96,7 +96,7 @@ protected void setup() throws Exception { admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet(this.conf.getClusterName())); - admin.namespaces().createNamespace(defaultTestNamespace, 128); + admin.namespaces().createNamespace(defaultTestNamespace); admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, Sets.newHashSet(this.conf.getClusterName())); } From 06de3e87febc180d3d2b49ee981bef3d53a0a593 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Mon, 4 Mar 2024 14:14:23 -0800 Subject: [PATCH 6/6] Fix test org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImplTest#testSplitBundleWithSpecificPositionAdminAPI --- .../extensions/ExtensibleLoadManagerImplBaseTest.java | 3 +-- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 250cec8c8a590..3353fbb96f226 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -31,7 +31,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -96,7 +95,7 @@ protected void setup() throws Exception { admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet(this.conf.getClusterName())); - admin.namespaces().createNamespace(defaultTestNamespace); + admin.namespaces().createNamespace(defaultTestNamespace, 128); admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, Sets.newHashSet(this.conf.getClusterName())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index c78ebd7011426..87c199676e03f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -367,7 +367,7 @@ public void testSplitBundleAdminAPI() throws Exception { public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception { String namespace = defaultTestNamespace; String topic = "persistent://" + namespace + "/test-split-with-specific-position"; - admin.topics().createPartitionedTopic(topic, 10); + admin.topics().createPartitionedTopic(topic, 1024); BundlesData bundles = admin.namespaces().getBundles(namespace); int numBundles = bundles.getNumBundles();