Skip to content

Commit

Permalink
[fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookee…
Browse files Browse the repository at this point in the history
…per event thread (#17620)
  • Loading branch information
lhotari authored and Technoboy- committed Sep 26, 2022
1 parent 1c63ee3 commit b121d85
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.Version;
Expand All @@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements RegistrationClient {

private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor;

public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
Expand All @@ -60,11 +64,15 @@ public PulsarRegistrationClient(MetadataStore store,
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;

this.executor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

store.registerListener(this::updatedBookies);
}

@Override
public void close() {
executor.shutdownNow();
}

@Override
Expand Down Expand Up @@ -99,7 +107,7 @@ private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getWritableBookies()
.thenAccept(registrationListener::onBookiesChanged);
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}

@Override
Expand All @@ -111,7 +119,7 @@ public void unwatchWritableBookies(RegistrationListener registrationListener) {
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getReadOnlyBookies()
.thenAccept(registrationListener::onBookiesChanged);
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}

@Override
Expand All @@ -124,11 +132,11 @@ private void updatedBookies(Notification n) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.keySet()
.forEach(w -> w.onBookiesChanged(bookies)));
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
.forEach(w -> w.onBookiesChanged(bookies)));
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
}
}
}
Expand Down

0 comments on commit b121d85

Please sign in to comment.