From f0f82a38c00485920f3e8291664d7e3c9d21632f Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:50:11 -0700 Subject: [PATCH] [fix][broker] timeout when broker registry hangs and monitor broker registry (ExtensibleLoadManagerImpl only) (#23382) (cherry picked from commit eee9283666cc9d84d0ddb998c279600898121d2b) --- .../pulsar/broker/admin/impl/BrokersBase.java | 11 +- .../extensions/BrokerRegistry.java | 5 + .../extensions/BrokerRegistryImpl.java | 69 ++++++++++-- .../extensions/ExtensibleLoadManagerImpl.java | 19 +++- .../channel/ServiceUnitStateChannelImpl.java | 105 +++++++++++++++--- .../extensions/BrokerRegistryTest.java | 33 ++++++ .../ExtensibleLoadManagerImplTest.java | 26 ++++- .../channel/ServiceUnitStateChannelTest.java | 75 +++++++++---- .../apache/pulsar/client/admin/Brokers.java | 12 +- .../client/admin/internal/BrokersImpl.java | 18 ++- 10 files changed, 310 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 10b15f6175f1f3..b20312226e789b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -48,6 +48,7 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) { @ApiOperation(value = "Run a healthCheck against the broker") @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is OK"), + @ApiResponse(code = 307, message = "Current broker is not the target broker"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")}) public void healthCheck(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Topic Version") - @QueryParam("topicVersion") TopicVersion topicVersion) { + @QueryParam("topicVersion") TopicVersion topicVersion, + @QueryParam("brokerId") String brokerId) { validateSuperUserAccessAsync() .thenAccept(__ -> checkDeadlockedThreads()) + .thenCompose(__ -> maybeRedirectToBroker( + StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) .thenCompose(__ -> internalRunHealthCheck(topicVersion)) .thenAccept(__ -> { LOG.info("[{}] Successfully run health check.", clientAppId()); asyncResponse.resume(Response.ok("ok").build()); }).exceptionally(ex -> { - LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + if (!isRedirectException(ex)) { + LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java index 79dba9c63342e5..d154edfbb320ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable { */ boolean isStarted(); + /** + * Return the broker has been registered. + */ + boolean isRegistered(); + /** * Register local broker to metadata store. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index c3cb8413b0128b..ef46aa8938fab7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -52,6 +52,8 @@ @Slf4j public class BrokerRegistryImpl implements BrokerRegistry { + private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000; + private final PulsarService pulsar; private final ServiceConfiguration conf; @@ -77,10 +79,11 @@ protected enum State { @VisibleForTesting final AtomicReference state = new AtomicReference<>(State.Init); - public BrokerRegistryImpl(PulsarService pulsar) { + @VisibleForTesting + BrokerRegistryImpl(PulsarService pulsar, MetadataCache brokerLookupDataMetadataCache) { this.pulsar = pulsar; this.conf = pulsar.getConfiguration(); - this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); + this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache; this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); @@ -98,6 +101,10 @@ public BrokerRegistryImpl(PulsarService pulsar) { pulsar.getBrokerVersion()); } + public BrokerRegistryImpl(PulsarService pulsar) { + this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class)); + } + @Override public synchronized void start() throws PulsarServerException { if (!this.state.compareAndSet(State.Init, State.Started)) { @@ -117,6 +124,12 @@ public boolean isStarted() { return state == State.Started || state == State.Registered; } + @Override + public boolean isRegistered() { + final var state = this.state.get(); + return state == State.Registered; + } + @Override public CompletableFuture registerAsync() { final var state = this.state.get(); @@ -126,12 +139,35 @@ public CompletableFuture registerAsync() { } log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state); return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) - .thenAccept(__ -> { - this.state.set(State.Registered); - log.info("[{}] Finished registering self", getBrokerId()); + .orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS) + .whenComplete((__, ex) -> { + if (ex == null) { + this.state.set(State.Registered); + log.info("[{}] Finished registering self", getBrokerId()); + } else { + log.error("[{}] Failed registering self", getBrokerId(), ex); + } }); } + private void doRegisterAsyncWithRetries(int retry, CompletableFuture future) { + pulsar.getExecutor().schedule(() -> { + registerAsync().whenComplete((__, e) -> { + if (e != null) { + doRegisterAsyncWithRetries(retry + 1, future); + } else { + future.complete(null); + } + }); + }, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS); + } + + private CompletableFuture registerAsyncWithRetries() { + var retryFuture = new CompletableFuture(); + doRegisterAsyncWithRetries(0, retryFuture); + return retryFuture; + } + @Override public synchronized void unregister() throws MetadataStoreException { if (state.compareAndSet(State.Registered, State.Unregistering)) { @@ -218,17 +254,26 @@ private void handleMetadataStoreNotification(Notification t) { // The registered node is an ephemeral node that could be deleted when the metadata store client's session // is expired. In this case, we should register again. final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); + + CompletableFuture register; if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { - registerAsync(); - } - if (listeners.isEmpty()) { - return; + this.state.set(State.Started); + register = registerAsyncWithRetries(); + } else { + register = CompletableFuture.completedFuture(null); } - this.scheduler.submit(() -> { - for (BiConsumer listener : listeners) { - listener.accept(brokerId, t.getType()); + // Make sure to run the listeners after re-registered. + register.thenAccept(__ -> { + if (listeners.isEmpty()) { + return; } + this.scheduler.submit(() -> { + for (BiConsumer listener : listeners) { + listener.accept(brokerId, t.getType()); + } + }); }); + } catch (RejectedExecutionException e) { // Executor is shutting down } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index bf98df2c5ac8f9..a1f01ea00beb01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -36,8 +36,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1007,8 +1009,12 @@ protected void monitor() { return; } + // Monitor broker registry + // Periodically check the broker registry in case metadata store fails. + validateBrokerRegistry(); + // Monitor role - // Periodically check the role in case ZK watcher fails. + // Periodically check the role in case metadata store fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); if (isChannelOwner) { // System topic config might fail due to the race condition @@ -1093,4 +1099,15 @@ boolean running() { private boolean disabled() { return state.get() == State.DISABLED; } + + private void validateBrokerRegistry() + throws ExecutionException, InterruptedException, TimeoutException { + var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS); + if (lookup.isEmpty()) { + log.warn("Found this broker:{} has not registered yet. Trying to register it", + brokerRegistry.getBrokerId()); + brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1f2715a00acfdd..58f2e6f30f4d4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -87,6 +87,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -98,6 +99,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.FutureUtil; @@ -121,6 +123,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000; + private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3; + private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000; private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; @@ -128,6 +132,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final String brokerId; private final Map> cleanupJobs; private final StateChangeListeners stateChangeListeners; + private BrokerRegistry brokerRegistry; private LeaderElectionService leaderElectionService; private TableView tableview; @@ -267,7 +272,7 @@ public void cancelOwnershipMonitor() { @Override public void cleanOwnerships() { disable(); - doCleanup(brokerId); + doCleanup(brokerId, true); } @Override @@ -383,6 +388,12 @@ protected LeaderElectionService getLeaderElectionService() { .get().getLeaderElectionService(); } + @VisibleForTesting + protected PulsarAdmin getPulsarAdmin() throws PulsarServerException { + return pulsar.getAdminClient(); + } + + @Override public synchronized void close() throws PulsarServerException { channelState = Closed; boolean debug = debug(); @@ -491,6 +502,14 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { + + // If this broker's registry does not exist(possibly suffering from connecting to the metadata store), + // we return the owner without its activeness check. + // This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable. + if (!brokerRegistry.isRegistered()) { + return CompletableFuture.completedFuture(owner); + } + return dedupeGetOwnerRequest(serviceUnit) .thenCompose(newOwner -> { if (newOwner == null) { @@ -1259,19 +1278,25 @@ private MetadataState getMetadataState() { } private void handleBrokerCreationEvent(String broker) { - CompletableFuture future = cleanupJobs.remove(broker); - if (future != null) { - future.cancel(false); - totalInactiveBrokerCleanupCancelledCnt++; - log.info("Successfully cancelled the ownership cleanup for broker:{}." - + " Active cleanup job count:{}", - broker, cleanupJobs.size()); - } else { - if (debug()) { - log.info("No needs to cancel the ownership cleanup for broker:{}." - + " There was no scheduled cleanup job. Active cleanup job count:{}", - broker, cleanupJobs.size()); - } + + if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) { + healthCheckBrokerAsync(broker) + .thenAccept(__ -> { + CompletableFuture future = cleanupJobs.remove(broker); + if (future != null) { + future.cancel(false); + totalInactiveBrokerCleanupCancelledCnt++; + log.info("Successfully cancelled the ownership cleanup for broker:{}." + + " Active cleanup job count:{}", + broker, cleanupJobs.size()); + } else { + if (debug()) { + log.info("No needs to cancel the ownership cleanup for broker:{}." + + " There was no scheduled cleanup job. Active cleanup job count:{}", + broker, cleanupJobs.size()); + } + } + }); } } @@ -1318,7 +1343,7 @@ private void scheduleCleanup(String broker, long delayInSecs) { var future = CompletableFuture .runAsync(() -> { try { - doCleanup(broker); + doCleanup(broker, false); } catch (Throwable e) { log.error("Failed to run the cleanup job for the broker {}, " + "totalCleanupErrorCnt:{}.", @@ -1422,7 +1447,38 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max System.currentTimeMillis() - started); } - private synchronized void doCleanup(String broker) { + private CompletableFuture healthCheckBrokerAsync(String brokerId) { + CompletableFuture future = new CompletableFuture<>(); + doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future); + return future; + } + + private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture future) { + try { + var admin = getPulsarAdmin(); + admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId)) + .whenComplete((__, e) -> { + if (e == null) { + log.info("Completed health-check broker :{}", brokerId, e); + future.complete(null); + return; + } + if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) { + log.error("Failed health-check broker :{}", brokerId, e); + future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + } else { + pulsar.getExecutor() + .schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future), + Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50), + MILLISECONDS); + } + }); + } catch (PulsarServerException e) { + future.completeExceptionally(e); + } + } + + private synchronized void doCleanup(String broker, boolean gracefully) { try { if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) .isEmpty()) { @@ -1435,6 +1491,23 @@ private synchronized void doCleanup(String broker) { return; } + // if not gracefully, verify the broker is inactive by health-check. + if (!gracefully) { + try { + healthCheckBrokerAsync(broker).get( + pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup", + broker); + return; + } catch (Exception e) { + if (debug()) { + log.info("Failed to check broker:{} health", broker, e); + } + log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker); + } + } + + long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 28a2a18500f5fb..941d0e4cbc3a02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -48,6 +52,7 @@ import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; @@ -396,6 +401,34 @@ public void testKeyPath() { assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId"); } + @Test + public void testRegisterAsyncTimeout() throws Exception { + var pulsar1 = createPulsarService(); + pulsar1.start(); + pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1); + var metadataCache = mock(MetadataCache.class); + var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache); + + // happy case + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.start(); + + // unhappy case (timeout) + doAnswer(invocationOnMock -> { + return CompletableFuture.supplyAsync(() -> null, CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS)); + }).when(metadataCache).put(any(), any(), any()); + try { + brokerRegistry.registerAsync().join(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + // happy case again + doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any()); + brokerRegistry.registerAsync().join(); + } + + private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { return brokerRegistry.state.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 7580c165a507de..ae775461150d17 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing; -import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; @@ -132,6 +131,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; @@ -457,14 +457,17 @@ public boolean test(NamespaceBundle namespaceBundle) { assertEquals(unloadCount.get(), 0); }); - ServiceUnitStateChannelImpl channel = new ServiceUnitStateChannelImpl(pulsar1); - channel.start(); + @Cleanup + ServiceUnitStateChannelImpl channel3 = new ServiceUnitStateChannelImpl(pulsar1); + channel3.start(); + @Cleanup + ServiceUnitStateChannelImpl channel4 = new ServiceUnitStateChannelImpl(pulsar2); + channel4.start(); Awaitility.await().untilAsserted(() -> { assertEquals(onloadCount.get(), 2); assertEquals(unloadCount.get(), 0); }); - channel.close(); } @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") @@ -1876,7 +1879,6 @@ public void unloadTimeoutCheckTest() String topic = topicAndBundle.getLeft().toString(); var bundle = topicAndBundle.getRight().toString(); var releasing = new ServiceUnitStateData(Releasing, pulsar2.getBrokerId(), pulsar1.getBrokerId(), 1); - overrideTableView(channel1, bundle, releasing); var topicFuture = pulsar1.getBrokerService().getOrCreateTopic(topic); @@ -1895,6 +1897,20 @@ public void unloadTimeoutCheckTest() TimeUnit.SECONDS).get(2, TimeUnit.SECONDS); } + @Test(timeOut = 30 * 1000) + public void testMonitorBrokerRegistry() throws MetadataStoreException { + primaryLoadManager.getBrokerRegistry().unregister(); + assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered()); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { // wait until true + primaryLoadManager.monitor(); + assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered()); + }); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index af42649436bd4e..a110746b2292e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -90,6 +90,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; +import org.apache.pulsar.client.admin.Brokers; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; @@ -129,8 +131,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private BrokerRegistryImpl registry; + private PulsarAdmin pulsarAdmin; + private ExtensibleLoadManagerImpl loadManager; + private Brokers brokers; + @BeforeClass @Override protected void setup() throws Exception { @@ -146,7 +152,9 @@ protected void setup() throws Exception { admin.namespaces().createNamespace("public/default"); pulsar1 = pulsar; - registry = new BrokerRegistryImpl(pulsar); + registry = spy(new BrokerRegistryImpl(pulsar1)); + registry.start(); + pulsarAdmin = spy(pulsar.getAdminClient()); loadManagerContext = mock(LoadManagerContext.class); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore(); doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore(); @@ -177,6 +185,10 @@ protected void setup() throws Exception { childBundle31 = "public/default3/" + childBundle1Range; childBundle32 = "public/default3/" + childBundle2Range; + + brokers = mock(Brokers.class); + doReturn(CompletableFuture.failedFuture(new RuntimeException("failed"))).when(brokers) + .healthcheckAsync(any(), any()); } @BeforeMethod @@ -689,17 +701,18 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { @Test(priority = 8) public void handleBrokerCreationEventTest() throws IllegalAccessException { var cleanupJobs = getCleanupJobs(channel1); - String broker = "broker-1"; + String broker = brokerId2; var future = new CompletableFuture(); cleanupJobs.put(broker, future); - channel1.handleBrokerRegistrationEvent(broker, NotificationType.Created); - assertEquals(0, cleanupJobs.size()); - assertTrue(future.isCancelled()); + ((ServiceUnitStateChannelImpl) channel1).handleBrokerRegistrationEvent(broker, NotificationType.Created); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, cleanupJobs.size()); + assertTrue(future.isCancelled()); + }); } @Test(priority = 9) - public void handleBrokerDeletionEventTest() - throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + public void handleBrokerDeletionEventTest() throws Exception { var cleanupJobs1 = getCleanupJobs(channel1); var cleanupJobs2 = getCleanupJobs(channel2); @@ -752,6 +765,8 @@ public void handleBrokerDeletionEventTest() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); leaderChannel.handleBrokerRegistrationEvent(brokerId2, NotificationType.Deleted); @@ -809,6 +824,7 @@ public void handleBrokerDeletionEventTest() 3, 0, 0); + reset(pulsarAdmin); // broker is back online leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created); @@ -833,6 +849,7 @@ public void handleBrokerDeletionEventTest() // broker is offline again + doReturn(brokers).when(pulsarAdmin).brokers(); FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3, true); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); @@ -874,6 +891,7 @@ public void handleBrokerDeletionEventTest() 4, 0, 1); + reset(pulsarAdmin); // test unstable state channel1.publishUnloadEventAsync(new Unload(brokerId2, bundle1, Optional.of(broker))); @@ -1540,9 +1558,12 @@ public void testOverrideInactiveBrokerStateData() System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); FieldUtils.writeDeclaredField(followerChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + waitUntilNewOwner(channel2, releasingBundle, brokerId2); waitUntilNewOwner(channel2, childBundle11, brokerId2); waitUntilNewOwner(channel2, childBundle12, brokerId2); @@ -1558,7 +1579,7 @@ public void testOverrideInactiveBrokerStateData() // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 18) @@ -1675,7 +1696,7 @@ public void testActiveGetOwner() throws Exception { // case 3: the bundle ownership is transferring, and the dst broker is the channel owner overrideTableViews(bundle, new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1)); - assertTrue(!channel1.getOwnerAsync(bundle).isDone()); + assertFalse(channel1.getOwnerAsync(bundle).isDone()); // case 4: the bundle ownership is found overrideTableViews(bundle, @@ -1684,18 +1705,15 @@ public void testActiveGetOwner() throws Exception { assertEquals(owner, broker); // case 5: the owner lookup gets delayed - var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", spyRegistry , true); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1000, true); var delayedFuture = new CompletableFuture(); - doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker)); + doReturn(delayedFuture).when(registry).lookupAsync(eq(broker)); CompletableFuture.runAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { - Thread.currentThread().interrupt();; + Thread.currentThread().interrupt(); } delayedFuture.complete(Optional.of(broker)); }); @@ -1708,7 +1726,7 @@ public void testActiveGetOwner() throws Exception { // case 6: the owner is inactive doReturn(CompletableFuture.completedFuture(Optional.empty())) - .when(spyRegistry).lookupAsync(eq(broker)); + .when(registry).lookupAsync(eq(broker)); // verify getOwnerAsync times out start = System.currentTimeMillis(); @@ -1716,19 +1734,32 @@ public void testActiveGetOwner() throws Exception { assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); + try { + // verify getOwnerAsync returns immediately when not registered + registry.unregister(); + start = System.currentTimeMillis(); + assertEquals(broker, channel1.getOwnerAsync(bundle).get().get()); + elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed < 1000); + } finally { + registry.registerAsync().join(); + } + + // case 7: the ownership cleanup(no new owner) by the leader channel doReturn(CompletableFuture.completedFuture(Optional.empty())) .when(loadManager).selectAsync(any(), any(), any()); - var leaderChannel = channel1; + ServiceUnitStateChannelImpl leaderChannel = (ServiceUnitStateChannelImpl) channel1; String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); assertEquals(leader1, leader2); if (leader1.equals(brokerId2)) { - leaderChannel = channel2; + leaderChannel = (ServiceUnitStateChannelImpl) channel2; } leaderChannel.handleMetadataSessionEvent(SessionReestablished); FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout @@ -1740,7 +1771,7 @@ public void testActiveGetOwner() throws Exception { waitUntilState(channel2, bundle, Init); assertTrue(System.currentTimeMillis() - start < 20_000); - + reset(pulsarAdmin); // case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); @@ -1750,6 +1781,7 @@ public void testActiveGetOwner() throws Exception { FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); getCleanupJobs(leaderChannel).clear(); + doReturn(brokers).when(pulsarAdmin).brokers(); leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); // verify the ownership cleanup, and channel's getOwnerAsync returns brokerId1 without timeout @@ -1760,10 +1792,8 @@ public void testActiveGetOwner() throws Exception { // test clean-up FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); - FieldUtils.writeDeclaredField(channel1, - "brokerRegistry", registry , true); cleanTableViews(); - + reset(pulsarAdmin); } @Test(priority = 20) @@ -2104,7 +2134,7 @@ private static void validateMonitorCounters(ServiceUnitStateChannel channel, } ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) - throws IllegalAccessException { + throws IllegalAccessException, PulsarServerException { var tmpChannel = new ServiceUnitStateChannelImpl(pulsar); FieldUtils.writeDeclaredField(tmpChannel, "ownershipMonitorDelayTimeInSecs", 5, true); var channel = spy(tmpChannel); @@ -2112,6 +2142,7 @@ ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) doReturn(loadManagerContext).when(channel).getContext(); doReturn(registry).when(channel).getBrokerRegistry(); doReturn(loadManager).when(channel).getLoadManager(); + doReturn(pulsarAdmin).when(channel).getPulsarAdmin(); var leaderElectionService = new LeaderElectionService( diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index dc0b7c9885a9a5..eed73f38282ac1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -320,10 +321,19 @@ Map getOwnedNamespaces(String cluster, String */ void healthcheck(TopicVersion topicVersion) throws PulsarAdminException; + /** + * Run a healthcheck on the target broker or on the broker. + * @param brokerId target broker id to check the health. If empty, it checks the health on the connected broker. + * + * @throws PulsarAdminException if the healthcheck fails. + */ + void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException; + /** * Run a healthcheck on the broker asynchronously. */ - CompletableFuture healthcheckAsync(TopicVersion topicVersion); + CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId); + /** * Trigger the current broker to graceful-shutdown asynchronously. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index b82c3fd0f414bb..35b261b196eeeb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; @@ -168,26 +169,35 @@ public CompletableFuture backlogQuotaCheckAsync() { @Override @Deprecated public void healthcheck() throws PulsarAdminException { - healthcheck(TopicVersion.V1); + healthcheck(TopicVersion.V1, Optional.empty()); } @Override @Deprecated public CompletableFuture healthcheckAsync() { - return healthcheckAsync(TopicVersion.V1); + return healthcheckAsync(TopicVersion.V1, Optional.empty()); } + @Override public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException { - sync(() -> healthcheckAsync(topicVersion)); + sync(() -> healthcheckAsync(topicVersion, Optional.empty())); } @Override - public CompletableFuture healthcheckAsync(TopicVersion topicVersion) { + public void healthcheck(TopicVersion topicVersion, Optional brokerId) throws PulsarAdminException { + sync(() -> healthcheckAsync(topicVersion, brokerId)); + } + + @Override + public CompletableFuture healthcheckAsync(TopicVersion topicVersion, Optional brokerId) { WebTarget path = adminBrokers.path("health"); if (topicVersion != null) { path = path.queryParam("topicVersion", topicVersion); } + if (brokerId.isPresent()) { + path = path.queryParam("brokerId", brokerId.get()); + } final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() {