Skip to content

Commit

Permalink
[improve][broker] re-elect the channel owner if no channel owner is f…
Browse files Browse the repository at this point in the history
…ound (#23516)

(cherry picked from commit 266e705)
  • Loading branch information
heesung-sn committed Nov 9, 2024
1 parent d41077d commit 22bede8
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,22 @@ public static boolean isInternalTopic(String topic) {
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private boolean handleNoChannelOwnerError(Throwable e) {
if (FutureUtil.unwrapCompletionException(e).getMessage().contains("no channel owner now")) {
var leaderElectionService = getLeaderElectionService();
log.warn("No channel owner is found. Trying to start LeaderElectionService again.");
leaderElectionService.start();
var channelOwner = serviceUnitStateChannel.getChannelOwnerAsync().join();
if (channelOwner.isEmpty()) {
log.error("Still no Leader is found even after LeaderElectionService restarted.");
return false;
}
log.info("Successfully started LeaderElectionService. The new channel owner is {}", channelOwner);
return true;
}
return false;
}

@VisibleForTesting
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
Expand All @@ -861,10 +877,19 @@ synchronized void playLeader() {
if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
try {
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
continue;
} else {
throw e;
}
}

if (disabled()) {
return;
}
Expand Down Expand Up @@ -924,10 +949,19 @@ synchronized void playFollower() {
if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
try {
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
continue;
} else {
throw e;
}
}

if (disabled()) {
return;
}
Expand Down Expand Up @@ -1015,7 +1049,17 @@ protected void monitor() {

// Monitor role
// Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();

boolean isChannelOwner = false;
try {
isChannelOwner = serviceUnitStateChannel.isChannelOwner();
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
monitor();
} else {
throw e;
}
}
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
Expand All @@ -1035,7 +1079,7 @@ protected void monitor() {
}
}
} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
log.error("Failed to monitor load manager state", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
Expand Down Expand Up @@ -73,7 +75,7 @@ public interface ServiceUnitStateChannel extends Closeable {
* Checks if the current broker is the owner broker of the system topic in this channel.
* @return True if the current broker is the owner. Otherwise, false.
*/
boolean isChannelOwner();
boolean isChannelOwner() throws ExecutionException, InterruptedException, TimeoutException;

/**
* Handles the metadata session events to track
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,9 @@ public CompletableFuture<Boolean> isChannelOwnerAsync() {
});
}

public boolean isChannelOwner() {
try {
return isChannelOwnerAsync().get(
MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
String msg = "Failed to get the channel owner.";
log.error(msg, e);
throw new RuntimeException(msg, e);
}
public boolean isChannelOwner() throws ExecutionException, InterruptedException, TimeoutException {
return isChannelOwnerAsync().get(
MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS);
}

public boolean isOwner(String serviceUnit, String targetBroker) {
Expand Down Expand Up @@ -1610,8 +1604,13 @@ private CompletableFuture<Optional<String>> selectBroker(String serviceUnit, Str

@VisibleForTesting
protected void monitorOwnerships(List<String> brokers) {
if (!isChannelOwner()) {
log.warn("This broker is not the leader now. Skipping ownership monitor.");
try {
if (!isChannelOwner()) {
log.warn("This broker is not the leader now. Skipping ownership monitor.");
return;
}
} catch (Exception e) {
log.error("Failed to monitor ownerships", e);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
Expand Down Expand Up @@ -156,12 +157,12 @@ public ExtensibleLoadManagerImplTest() {
@Test
public void testAssignInternalTopic() throws Exception {
Optional<BrokerLookupData> brokerLookupData1 = primaryLoadManager.assign(
Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(),
Optional.of(TopicName.get(TOPIC)),
getBundleAsync(pulsar1, TopicName.get(TOPIC)).get(),
LookupOptions.builder().build()).get();
Optional<BrokerLookupData> brokerLookupData2 = secondaryLoadManager.assign(
Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
getBundleAsync(pulsar1, TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get(),
Optional.of(TopicName.get(TOPIC)),
getBundleAsync(pulsar1, TopicName.get(TOPIC)).get(),
LookupOptions.builder().build()).get();
assertEquals(brokerLookupData1, brokerLookupData2);
assertTrue(brokerLookupData1.isPresent());
Expand Down Expand Up @@ -1244,16 +1245,16 @@ private void makePrimaryAsLeader() throws Exception {
log.info("makePrimaryAsLeader");
if (channel2.isChannelOwner()) {
pulsar2.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
pulsar2.getLeaderElectionService().start();
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertFalse(channel2.isChannelOwner());
});
}
Expand Down Expand Up @@ -1361,7 +1362,69 @@ public void testRoleChangeIdempotency() throws Exception {
topBundlesLoadDataStoreSecondary, true);
}
}
@Test(timeOut = 30 * 1000)

@DataProvider(name = "noChannelOwnerMonitorHandler")
public Object[][] noChannelOwnerMonitorHandler() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101)
public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) throws Exception {

makePrimaryAsLeader();
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();

assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

try {
// simulate no owner in the channel
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
try {
pulsar1.getLeaderElectionService().close();
pulsar2.getLeaderElectionService().close();
primaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
return false;
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException && e.getMessage()
.contains("no channel owner now")) {
return true;
} else {
return false;
}
}
});

// elect new channel owner by either monitor or playLeader/playFollower
if (noChannelOwnerMonitorHandler) {
secondaryLoadManager.monitor();
primaryLoadManager.monitor();
} else {
secondaryLoadManager.playLeader();
primaryLoadManager.playFollower();
}
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
secondaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
primaryLoadManager.getRole());

assertTrue(secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
assertFalse(primaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
});

} finally {
// clean up for monitor test
pulsar1.getLeaderElectionService().start();
pulsar2.getLeaderElectionService().start();
}
}

@Test(timeOut = 30 * 1000, priority = 2000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();

Expand All @@ -1379,14 +1442,12 @@ public void testRoleChange() throws Exception {
topBundlesExpected.getTopBundlesLoadData().clear();
topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats()));

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {

assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(), "tableView", true));


for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic)
.isPresent());
Expand All @@ -1398,22 +1459,9 @@ public void testRoleChange() throws Exception {
assertFalse(follower.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}

var actualBrokerLoadLeader = leader.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadLeader.isPresent()) {
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
}

var actualTopBundlesLeader = leader.getTopBundlesLoadDataStore().get(bundle);
if (actualTopBundlesLeader.isPresent()) {
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
}

var actualBrokerLoadFollower = follower.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadFollower.isPresent()) {
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
}
});
follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected).get(3, TimeUnit.SECONDS);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected).get(3, TimeUnit.SECONDS);

makeSecondaryAsLeader();

Expand All @@ -1423,9 +1471,6 @@ public void testRoleChange() throws Exception {
brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf);
topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1;

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true));
Expand All @@ -1441,17 +1486,10 @@ public void testRoleChange() throws Exception {
assertFalse(follower2.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}


var actualBrokerLoadLeader = leader2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);

var actualTopBundlesLeader = leader2.getTopBundlesLoadDataStore().get(bundle);
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);

var actualBrokerLoadFollower = follower2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
});

follower2.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected).get(3, TimeUnit.SECONDS);
follower2.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected).get(3, TimeUnit.SECONDS);
}

@Test
Expand Down Expand Up @@ -1868,7 +1906,7 @@ public void compactionScheduleTest() {
primaryLoadManager.monitor();
secondaryLoadManager.monitor();
var threshold = admin.topicPolicies()
.getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false);
.getCompactionThreshold(TOPIC, false);
AssertJUnit.assertEquals(5 * 1024 * 1024, threshold == null ? 0 : threshold.longValue());
});
}
Expand Down

0 comments on commit 22bede8

Please sign in to comment.