diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index 78a33179e76b6..b7e3024b637fb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -63,7 +64,57 @@ public byte[] serialize(String path, BookieServiceInfo bookieServiceInfo) throws } @Override - public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { - return null; + public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { + // see https://github.com/apache/bookkeeper/blob/ + // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ + // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 + String bookieId = extractBookiedIdFromPath(path); + if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { + return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); + } + + BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); + BookieServiceInfo bsi = new BookieServiceInfo(); + List endpoints = builder.getEndpointsList().stream() + .map(e -> { + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setId(e.getId()); + endpoint.setPort(e.getPort()); + endpoint.setHost(e.getHost()); + endpoint.setProtocol(e.getProtocol()); + endpoint.setAuth(e.getAuthList()); + endpoint.setExtensions(e.getExtensionsList()); + return endpoint; + }) + .collect(Collectors.toList()); + + bsi.setEndpoints(endpoints); + bsi.setProperties(builder.getPropertiesMap()); + + return bsi; + + } + + /** + * Extract the BookieId + * The path should look like /ledgers/available/bookieId + * or /ledgers/available/readonly/bookieId. + * But the prefix depends on the configuration. + * @param path + * @return the bookieId + */ + private static String extractBookiedIdFromPath(String path) throws IOException { + // https://github.com/apache/bookkeeper/blob/ + // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ + // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 + if (path == null) { + path = ""; + } + int last = path.lastIndexOf("/"); + if (last >= 0) { + return path.substring(last + 1); + } else { + throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); + } } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 52b50e3ea4b08..f314c0efaf058 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -22,22 +22,33 @@ import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; +@Slf4j public class PulsarRegistrationClient implements RegistrationClient { private final MetadataStore store; @@ -47,14 +58,18 @@ public class PulsarRegistrationClient implements RegistrationClient { private final String bookieAllRegistrationPath; private final String bookieReadonlyRegistrationPath; + private final ConcurrentHashMap> bookieServiceInfoCache = + new ConcurrentHashMap(); private final Map writableBookiesWatchers = new ConcurrentHashMap<>(); private final Map readOnlyBookiesWatchers = new ConcurrentHashMap<>(); + private final MetadataCache bookieServiceInfoMetadataCache; private final ScheduledExecutorService executor; public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) { this.store = store; this.ledgersRootPath = ledgersRootPath; + this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE); // Following Bookie Network Address Changes is an expensive operation // as it requires additional ZooKeeper watches @@ -99,7 +114,25 @@ public CompletableFuture>> getReadOnlyBookies() { private CompletableFuture>> getChildren(String path) { return store.getChildren(path) - .thenApply(PulsarRegistrationClient::convertToBookieAddresses) + .thenComposeAsync(children -> { + Set bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children); + Set bookies = convertToBookieAddresses(children); + List>> bookieInfoUpdated = + new ArrayList<>(bookies.size()); + for (BookieId id : bookies) { + // update the cache for new bookies + if (!bookieServiceInfoCache.containsKey(id)) { + bookieInfoUpdated.add(readBookieServiceInfoAsync(id)); + } + } + if (bookieInfoUpdated.isEmpty()) { + return CompletableFuture.completedFuture(bookieIds); + } else { + return FutureUtil + .waitForAll(bookieInfoUpdated) + .thenApply(___ -> bookieIds); + } + }) .thenApply(s -> new Versioned<>(s, Version.NEW)); } @@ -129,10 +162,20 @@ public void unwatchReadOnlyBookies(RegistrationListener registrationListener) { private void updatedBookies(Notification n) { if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) { + + if (n.getType() == NotificationType.Deleted) { + BookieId bookieId = stripBookieIdFromPath(n.getPath()); + log.info("Bookie {} disappeared", bookieId); + if (bookieId != null) { + bookieServiceInfoCache.remove(bookieId); + } + } + if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) { - getReadOnlyBookies().thenAccept(bookies -> - readOnlyBookiesWatchers.keySet() - .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)))); + getReadOnlyBookies().thenAccept(bookies -> { + readOnlyBookiesWatchers.keySet() + .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))); + }); } else if (n.getPath().startsWith(bookieRegistrationPath)) { getWritableBookies().thenAccept(bookies -> writableBookiesWatchers.keySet() @@ -141,6 +184,22 @@ private void updatedBookies(Notification n) { } } + private static BookieId stripBookieIdFromPath(String path) { + if (path == null) { + return null; + } + final int slash = path.lastIndexOf('/'); + if (slash >= 0) { + try { + return BookieId.parse(path.substring(slash + 1)); + } catch (IllegalArgumentException e) { + log.warn("Cannot decode bookieId from {}", path, e); + } + } + return null; + } + + private static Set convertToBookieAddresses(List children) { // Read the bookie addresses into a set for efficient lookup HashSet newBookieAddrs = new HashSet<>(); @@ -153,4 +212,56 @@ private static Set convertToBookieAddresses(List children) { } return newBookieAddrs; } + + @Override + public CompletableFuture> getBookieServiceInfo(BookieId bookieId) { + // this method cannot perform blocking calls to the MetadataStore + // or return a CompletableFuture that is completed on the MetadataStore main thread + // this is because there are a few cases in which some operations on the main thread + // wait for the result. This is due to the fact that resolving the address of a bookie + // is needed in many code paths. + Versioned resultFromCache = bookieServiceInfoCache.get(bookieId); + if (log.isDebugEnabled()) { + log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache); + } + if (resultFromCache != null) { + return CompletableFuture.completedFuture(resultFromCache); + } else { + return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException()); + } + } + + public CompletableFuture> readBookieServiceInfoAsync(BookieId bookieId) { + String asWritable = bookieRegistrationPath + "/" + bookieId; + return bookieServiceInfoMetadataCache.get(asWritable) + .thenCompose((Optional getResult) -> { + if (getResult.isPresent()) { + Versioned res = + new Versioned<>(getResult.get(), new LongVersion(-1)); + log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get()); + bookieServiceInfoCache.put(bookieId, res); + return CompletableFuture.completedFuture(res); + } else { + return readBookieInfoAsReadonlyBookie(bookieId); + } + } + ); + } + + final CompletableFuture> readBookieInfoAsReadonlyBookie(BookieId bookieId) { + String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; + return bookieServiceInfoMetadataCache.get(asReadonly) + .thenApply((Optional getResultAsReadOnly) -> { + if (getResultAsReadOnly.isPresent()) { + Versioned res = + new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1)); + log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, + getResultAsReadOnly.get()); + bookieServiceInfoCache.put(bookieId, res); + return res; + } else { + throw new CompletionException(new BKException.BKBookieHandleNotAvailableException()); + } + }); + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java index 38195b230ce29..df70c9cf5dbe0 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -114,6 +116,66 @@ public void testGetReadonlyBookies(String provider, Supplier urlSupplier assertEquals(addresses.size(), result.getValue().size()); } + @Test(dataProvider = "impl") + public void testGetBookieServiceInfo(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = + MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); + + @Cleanup + RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class)); + + @Cleanup + RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot); + + List addresses = new ArrayList<>(prepareNBookies(10)); + List bookieServiceInfos = new ArrayList<>(); + int port = 223; + for (BookieId address : addresses) { + BookieServiceInfo info = new BookieServiceInfo(); + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setAuth(Collections.emptyList()); + endpoint.setExtensions(Collections.emptyList()); + endpoint.setId("id"); + endpoint.setHost("localhost"); + endpoint.setPort(port++); + endpoint.setProtocol("bookie-rpc"); + info.setEndpoints(Arrays.asList(endpoint)); + bookieServiceInfos.add(info); + // some readonly, some writable + boolean readOnly = port % 2 == 0; + rm.registerBookie(address, readOnly, info); + } + + // trigger loading the BookieServiceInfo in the local cache + rc.getAllBookies().join(); + + int i = 0; + for (BookieId address : addresses) { + BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); + compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++)); + } + + } + + private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) { + assertEquals(a.getProperties(), b.getProperties()); + assertEquals(a.getEndpoints().size(), b.getEndpoints().size()); + for (int i = 0; i < a.getEndpoints().size(); i++) { + BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i); + BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i); + assertEquals(e1.getHost(), e2.getHost()); + assertEquals(e1.getPort(), e2.getPort()); + assertEquals(e1.getId(), e2.getId()); + assertEquals(e1.getProtocol(), e2.getProtocol()); + assertEquals(e1.getExtensions(), e2.getExtensions()); + assertEquals(e1.getAuth(), e2.getAuth()); + } + + } + @Test(dataProvider = "impl") public void testGetAllBookies(String provider, Supplier urlSupplier) throws Exception { @Cleanup