diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index a1f01ea00beb0..eb0910c8afcbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -850,6 +850,22 @@ public static boolean isInternalTopic(String topic) { || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } + private boolean handleNoChannelOwnerError(Throwable e) { + if (FutureUtil.unwrapCompletionException(e).getMessage().contains("no channel owner now")) { + var leaderElectionService = getLeaderElectionService(); + log.warn("No channel owner is found. Trying to start LeaderElectionService again."); + leaderElectionService.start(); + var channelOwner = serviceUnitStateChannel.getChannelOwnerAsync().join(); + if (channelOwner.isEmpty()) { + log.error("Still no Leader is found even after LeaderElectionService restarted."); + return false; + } + log.info("Successfully started LeaderElectionService. The new channel owner is {}", channelOwner); + return true; + } + return false; + } + @VisibleForTesting synchronized void playLeader() { log.info("This broker:{} is setting the role from {} to {}", @@ -861,10 +877,19 @@ synchronized void playLeader() { if (!initWaiter.get() || disabled()) { return; } - if (!serviceUnitStateChannel.isChannelOwner()) { - becameFollower = true; - break; + try { + if (!serviceUnitStateChannel.isChannelOwner()) { + becameFollower = true; + break; + } + } catch (Throwable e) { + if (handleNoChannelOwnerError(e)) { + continue; + } else { + throw e; + } } + if (disabled()) { return; } @@ -924,10 +949,19 @@ synchronized void playFollower() { if (!initWaiter.get() || disabled()) { return; } - if (serviceUnitStateChannel.isChannelOwner()) { - becameLeader = true; - break; + try { + if (serviceUnitStateChannel.isChannelOwner()) { + becameLeader = true; + break; + } + } catch (Throwable e) { + if (handleNoChannelOwnerError(e)) { + continue; + } else { + throw e; + } } + if (disabled()) { return; } @@ -1015,7 +1049,17 @@ protected void monitor() { // Monitor role // Periodically check the role in case metadata store fails. - var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); + + boolean isChannelOwner = false; + try { + isChannelOwner = serviceUnitStateChannel.isChannelOwner(); + } catch (Throwable e) { + if (handleNoChannelOwnerError(e)) { + monitor(); + } else { + throw e; + } + } if (isChannelOwner) { // System topic config might fail due to the race condition // with topic policy init(Topic policies cache have not init). @@ -1035,7 +1079,7 @@ protected void monitor() { } } } catch (Throwable e) { - log.error("Failed to get the channel ownership.", e); + log.error("Failed to monitor load manager state", e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 6319fc332a678..c1f58b7621c6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -24,6 +24,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; @@ -73,7 +75,7 @@ public interface ServiceUnitStateChannel extends Closeable { * Checks if the current broker is the owner broker of the system topic in this channel. * @return True if the current broker is the owner. Otherwise, false. */ - boolean isChannelOwner(); + boolean isChannelOwner() throws ExecutionException, InterruptedException, TimeoutException; /** * Handles the metadata session events to track diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index dab5cdac819ff..443ff6272a21b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -469,15 +469,9 @@ public CompletableFuture isChannelOwnerAsync() { }); } - public boolean isChannelOwner() { - try { - return isChannelOwnerAsync().get( - MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - String msg = "Failed to get the channel owner."; - log.error(msg, e); - throw new RuntimeException(msg, e); - } + public boolean isChannelOwner() throws ExecutionException, InterruptedException, TimeoutException { + return isChannelOwnerAsync().get( + MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS); } public boolean isOwner(String serviceUnit, String targetBroker) { @@ -1610,8 +1604,13 @@ private CompletableFuture> selectBroker(String serviceUnit, Str @VisibleForTesting protected void monitorOwnerships(List brokers) { - if (!isChannelOwner()) { - log.warn("This broker is not the leader now. Skipping ownership monitor."); + try { + if (!isChannelOwner()) { + log.warn("This broker is not the leader now. Skipping ownership monitor."); + return; + } + } catch (Exception e) { + log.error("Failed to monitor ownerships", e); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index a136e22c178a4..ecba040d4f642 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; @@ -156,12 +157,12 @@ public ExtensibleLoadManagerImplTest() { @Test public void testAssignInternalTopic() throws Exception { Optional brokerLookupData1 = primaryLoadManager.assign( - Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), - getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(), + Optional.of(TopicName.get(TOPIC)), + getBundleAsync(pulsar1, TopicName.get(TOPIC)).get(), LookupOptions.builder().build()).get(); Optional brokerLookupData2 = secondaryLoadManager.assign( - Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)), - getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(), + Optional.of(TopicName.get(TOPIC)), + getBundleAsync(pulsar1, TopicName.get(TOPIC)).get(), LookupOptions.builder().build()).get(); assertEquals(brokerLookupData1, brokerLookupData2); assertTrue(brokerLookupData1.isPresent()); @@ -1244,16 +1245,16 @@ private void makePrimaryAsLeader() throws Exception { log.info("makePrimaryAsLeader"); if (channel2.isChannelOwner()) { pulsar2.getLeaderElectionService().close(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertTrue(channel1.isChannelOwner()); }); pulsar2.getLeaderElectionService().start(); } - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertTrue(channel1.isChannelOwner()); }); - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertFalse(channel2.isChannelOwner()); }); } @@ -1361,7 +1362,69 @@ public void testRoleChangeIdempotency() throws Exception { topBundlesLoadDataStoreSecondary, true); } } - @Test(timeOut = 30 * 1000) + + @DataProvider(name = "noChannelOwnerMonitorHandler") + public Object[][] noChannelOwnerMonitorHandler() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101) + public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) throws Exception { + + makePrimaryAsLeader(); + primaryLoadManager.playLeader(); + secondaryLoadManager.playFollower(); + + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); + + try { + // simulate no owner in the channel + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> { + try { + pulsar1.getLeaderElectionService().close(); + pulsar2.getLeaderElectionService().close(); + primaryLoadManager.getServiceUnitStateChannel().isChannelOwner(); + secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner(); + return false; + } catch (ExecutionException e) { + if (e.getCause() instanceof IllegalStateException && e.getMessage() + .contains("no channel owner now")) { + return true; + } else { + return false; + } + } + }); + + // elect new channel owner by either monitor or playLeader/playFollower + if (noChannelOwnerMonitorHandler) { + secondaryLoadManager.monitor(); + primaryLoadManager.monitor(); + } else { + secondaryLoadManager.playLeader(); + primaryLoadManager.playFollower(); + } + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + secondaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + primaryLoadManager.getRole()); + + assertTrue(secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner()); + assertFalse(primaryLoadManager.getServiceUnitStateChannel().isChannelOwner()); + }); + + } finally { + // clean up for monitor test + pulsar1.getLeaderElectionService().start(); + pulsar2.getLeaderElectionService().start(); + } + } + + @Test(timeOut = 30 * 1000, priority = -2) public void testRoleChange() throws Exception { makePrimaryAsLeader(); @@ -1379,9 +1442,6 @@ public void testRoleChange() throws Exception { topBundlesExpected.getTopBundlesLoadData().clear(); topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats())); - follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected); - follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected); - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true)); @@ -1398,20 +1458,15 @@ public void testRoleChange() throws Exception { assertFalse(follower.pulsar.getNamespaceService() .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); } + }); - var actualBrokerLoadLeader = leader.getBrokerLoadDataStore().get(key); - if (actualBrokerLoadLeader.isPresent()) { - assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected); - } - - var actualTopBundlesLeader = leader.getTopBundlesLoadDataStore().get(bundle); - if (actualTopBundlesLeader.isPresent()) { - assertEquals(actualTopBundlesLeader.get(), topBundlesExpected); - } - - var actualBrokerLoadFollower = follower.getBrokerLoadDataStore().get(key); - if (actualBrokerLoadFollower.isPresent()) { - assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + try { + follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected).get(3, TimeUnit.SECONDS); + follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected).get(3, TimeUnit.SECONDS); + return true; + } catch (Exception e) { + return false; } }); @@ -1423,9 +1478,6 @@ public void testRoleChange() throws Exception { brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf); topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1; - follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected); - follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true)); assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true)); @@ -1441,16 +1493,15 @@ public void testRoleChange() throws Exception { assertFalse(follower2.pulsar.getNamespaceService() .isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get()); } - - - var actualBrokerLoadLeader = leader2.getBrokerLoadDataStore().get(key); - assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected); - - var actualTopBundlesLeader = leader2.getTopBundlesLoadDataStore().get(bundle); - assertEquals(actualTopBundlesLeader.get(), topBundlesExpected); - - var actualBrokerLoadFollower = follower2.getBrokerLoadDataStore().get(key); - assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected); + }); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + try { + follower2.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected).get(3, TimeUnit.SECONDS); + follower2.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected).get(3, TimeUnit.SECONDS); + return true; + } catch (Exception e) { + return false; + } }); } @@ -1868,12 +1919,12 @@ public void compactionScheduleTest() { primaryLoadManager.monitor(); secondaryLoadManager.monitor(); var threshold = admin.topicPolicies() - .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); + .getCompactionThreshold(TOPIC, false); AssertJUnit.assertEquals(5 * 1024 * 1024, threshold == null ? 0 : threshold.longValue()); }); } - @Test(timeOut = 10 * 1000) + @Test(timeOut = 10 * 1000, priority = 5000) public void unloadTimeoutCheckTest() throws Exception { Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("unload-timeout");