Skip to content

Commit

Permalink
[improve][broker] re-elect the channel owner if no channel owner is f…
Browse files Browse the repository at this point in the history
…ound (#23516)
  • Loading branch information
heesung-sn authored Oct 28, 2024
1 parent ebb3cb5 commit 266e705
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,22 @@ public static boolean isInternalTopic(String topic) {
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private boolean handleNoChannelOwnerError(Throwable e) {
if (FutureUtil.unwrapCompletionException(e).getMessage().contains("no channel owner now")) {
var leaderElectionService = getLeaderElectionService();
log.warn("No channel owner is found. Trying to start LeaderElectionService again.");
leaderElectionService.start();
var channelOwner = serviceUnitStateChannel.getChannelOwnerAsync().join();
if (channelOwner.isEmpty()) {
log.error("Still no Leader is found even after LeaderElectionService restarted.");
return false;
}
log.info("Successfully started LeaderElectionService. The new channel owner is {}", channelOwner);
return true;
}
return false;
}

@VisibleForTesting
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
Expand All @@ -837,10 +853,19 @@ synchronized void playLeader() {
if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
try {
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
continue;
} else {
throw e;
}
}

if (disabled()) {
return;
}
Expand Down Expand Up @@ -903,10 +928,19 @@ synchronized void playFollower() {
if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
try {
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
continue;
} else {
throw e;
}
}

if (disabled()) {
return;
}
Expand Down Expand Up @@ -995,7 +1029,17 @@ protected void monitor() {

// Monitor role
// Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();

boolean isChannelOwner = false;
try {
isChannelOwner = serviceUnitStateChannel.isChannelOwner();
} catch (Throwable e) {
if (handleNoChannelOwnerError(e)) {
monitor();
} else {
throw e;
}
}
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
Expand Down Expand Up @@ -1023,7 +1067,7 @@ protected void monitor() {
serviceUnitStateTableViewSyncer.close();
}
} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
log.error("Failed to monitor load manager state", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1491,16 +1491,16 @@ private void makePrimaryAsLeader() throws Exception {
log.info("makePrimaryAsLeader");
if (channel2.isChannelOwner()) {
pulsar2.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
pulsar2.getLeaderElectionService().start();
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertFalse(channel2.isChannelOwner());
});
}
Expand Down Expand Up @@ -1605,6 +1605,82 @@ public void testRoleChangeIdempotency() throws Exception {


}

@DataProvider(name = "noChannelOwnerMonitorHandler")
public Object[][] noChannelOwnerMonitorHandler() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101)
public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) throws Exception {

makePrimaryAsLeader();
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();

assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

try {

// simulate no owner in the channel
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
try {
pulsar1.getLeaderElectionService().close();
primaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
return false;
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException && e.getMessage()
.contains("no channel owner now")) {
return true;
} else {
return false;
}
}
});

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
try {
pulsar2.getLeaderElectionService().close();
secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
return false;
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException && e.getMessage()
.contains("no channel owner now")) {
return true;
} else {
return false;
}
}
});

// elect new channel owner by either monitor or playLeader/playFollower
if (noChannelOwnerMonitorHandler) {
secondaryLoadManager.monitor();
primaryLoadManager.monitor();
} else {
secondaryLoadManager.playLeader();
primaryLoadManager.playFollower();
}
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
secondaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
primaryLoadManager.getRole());

assertTrue(secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
assertFalse(primaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
});

} finally {
// clean up for monitor test
pulsar1.getLeaderElectionService().start();
pulsar2.getLeaderElectionService().start();
}
}

@Test(timeOut = 30 * 1000, priority = 2000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();
Expand Down

0 comments on commit 266e705

Please sign in to comment.