Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] timeout when broker registry hangs and monitor broker registry (ExtensibleLoadManagerImpl only) #23382

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable {
*/
boolean isStarted();

/**
* Return the broker has been registered.
*/
boolean isRegistered();

/**
* Register local broker to metadata store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ protected enum State {
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);

public BrokerRegistryImpl(PulsarService pulsar) {
@VisibleForTesting
BrokerRegistryImpl(PulsarService pulsar, MetadataCache<BrokerLookupData> brokerLookupDataMetadataCache) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache;
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
Expand All @@ -99,6 +100,10 @@ public BrokerRegistryImpl(PulsarService pulsar) {
pulsar.getConfig().lookupProperties());
}

public BrokerRegistryImpl(PulsarService pulsar) {
this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
}

@Override
public synchronized void start() throws PulsarServerException {
if (!this.state.compareAndSet(State.Init, State.Started)) {
Expand All @@ -118,6 +123,12 @@ public boolean isStarted() {
return state == State.Started || state == State.Registered;
}

@Override
public boolean isRegistered() {
final var state = this.state.get();
return state == State.Registered;
}

@Override
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
Expand All @@ -127,9 +138,14 @@ public CompletableFuture<Void> registerAsync() {
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.thenAccept(__ -> {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
.whenComplete((__, ex) -> {
if (ex == null) {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
} else {
log.error("[{}] Failed registering self", getBrokerId(), ex);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -987,8 +989,12 @@ protected void monitor() {
return;
}

// Monitor broker registry
// Periodically check the broker registry in case metadata store fails.
validateBrokerRegistry();

// Monitor role
// Periodically check the role in case ZK watcher fails.
// Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
Expand Down Expand Up @@ -1087,5 +1093,15 @@ private boolean isPersistentSystemTopicUsed() {
.equals(pulsar.getConfiguration().getLoadManagerServiceUnitStateTableViewClassName());
}

private void validateBrokerRegistry()
throws ExecutionException, InterruptedException, TimeoutException {
var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS);
if (lookup.isEmpty()) {
log.warn("Found this broker:{} has not registered yet. Trying to register it",
brokerRegistry.getBrokerId());
brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -36,6 +39,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -48,6 +52,7 @@
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
Expand Down Expand Up @@ -396,6 +401,34 @@ public void testKeyPath() {
assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
}

@Test
public void testRegisterAsyncTimeout() throws Exception {
var pulsar1 = createPulsarService();
pulsar1.start();
pulsar1.getConfiguration().setMetadataStoreOperationTimeoutSeconds(1);
var metadataCache = mock(MetadataCache.class);
var brokerRegistry = new BrokerRegistryImpl(pulsar1, metadataCache);

// happy case
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any());
brokerRegistry.start();

// unhappy case (timeout)
doAnswer(invocationOnMock -> {
return CompletableFuture.supplyAsync(() -> null, CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS));
}).when(metadataCache).put(any(), any(), any());
try {
brokerRegistry.registerAsync().join();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TimeoutException);
}

// happy case again
doReturn(CompletableFuture.completedFuture(null)).when(metadataCache).put(any(), any(), any());
brokerRegistry.registerAsync().join();
}


private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return brokerRegistry.state.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
Expand Down Expand Up @@ -2106,6 +2107,20 @@ public void compactionScheduleTest() {
});
}

@Test(timeOut = 30 * 1000)
public void testMonitorBrokerRegistry() throws MetadataStoreException {
primaryLoadManager.getBrokerRegistry().unregister();
assertFalse(primaryLoadManager.getBrokerRegistry().isRegistered());
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.ignoreExceptions()
.untilAsserted(() -> { // wait until true
primaryLoadManager.monitor();
assertTrue(primaryLoadManager.getBrokerRegistry().isRegistered());
});
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down
Loading