From ba890cc950066fccfea6f3f6bf4cb72c33f5d063 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Sat, 24 Sep 2022 22:07:49 +0200 Subject: [PATCH] Make BookieId work with PulsarRegistrationDriver (#17762) * Make BookieId work with PulsarRegistrationDriver * Switch to MetadataCache * checkstyle --- .../bookkeeper/BookieServiceInfoSerde.java | 55 ++++++++++++++++- .../bookkeeper/PulsarRegistrationClient.java | 36 +++++++++++ .../PulsarRegistrationClientTest.java | 59 +++++++++++++++++++ 3 files changed, 148 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index 78a33179e76b6..b7e3024b637fb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -63,7 +64,57 @@ public byte[] serialize(String path, BookieServiceInfo bookieServiceInfo) throws } @Override - public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { - return null; + public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { + // see https://github.com/apache/bookkeeper/blob/ + // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ + // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 + String bookieId = extractBookiedIdFromPath(path); + if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { + return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); + } + + BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); + BookieServiceInfo bsi = new BookieServiceInfo(); + List endpoints = builder.getEndpointsList().stream() + .map(e -> { + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setId(e.getId()); + endpoint.setPort(e.getPort()); + endpoint.setHost(e.getHost()); + endpoint.setProtocol(e.getProtocol()); + endpoint.setAuth(e.getAuthList()); + endpoint.setExtensions(e.getExtensionsList()); + return endpoint; + }) + .collect(Collectors.toList()); + + bsi.setEndpoints(endpoints); + bsi.setProperties(builder.getPropertiesMap()); + + return bsi; + + } + + /** + * Extract the BookieId + * The path should look like /ledgers/available/bookieId + * or /ledgers/available/readonly/bookieId. + * But the prefix depends on the configuration. + * @param path + * @return the bookieId + */ + private static String extractBookiedIdFromPath(String path) throws IOException { + // https://github.com/apache/bookkeeper/blob/ + // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ + // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 + if (path == null) { + path = ""; + } + int last = path.lastIndexOf("/"); + if (last >= 0) { + return path.substring(last + 1); + } else { + throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); + } } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 52b50e3ea4b08..1c6924043182e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -25,15 +25,21 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -49,12 +55,14 @@ public class PulsarRegistrationClient implements RegistrationClient { private final Map writableBookiesWatchers = new ConcurrentHashMap<>(); private final Map readOnlyBookiesWatchers = new ConcurrentHashMap<>(); + private final MetadataCache bookieServiceInfoMetadataCache; private final ScheduledExecutorService executor; public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) { this.store = store; this.ledgersRootPath = ledgersRootPath; + this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE); // Following Bookie Network Address Changes is an expensive operation // as it requires additional ZooKeeper watches @@ -153,4 +161,32 @@ private static Set convertToBookieAddresses(List children) { } return newBookieAddrs; } + + @Override + public CompletableFuture> getBookieServiceInfo(BookieId bookieId) { + String asWritable = bookieRegistrationPath + "/" + bookieId; + + return bookieServiceInfoMetadataCache.get(asWritable) + .thenCompose((Optional getResult) -> { + if (getResult.isPresent()) { + return CompletableFuture.completedFuture(new Versioned<>(getResult.get(), + new LongVersion(-1))); + } else { + return readBookieInfoAsReadonlyBookie(bookieId); + } + } + ); + } + + final CompletableFuture> readBookieInfoAsReadonlyBookie(BookieId bookieId) { + String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; + return bookieServiceInfoMetadataCache.get(asReadonly) + .thenApply((Optional getResultAsReadOnly) -> { + if (getResultAsReadOnly.isPresent()) { + return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1)); + } else { + throw new CompletionException(new BKException.BKBookieHandleNotAvailableException()); + } + }); + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java index 496cfebea512f..047bedd158777 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java @@ -23,6 +23,8 @@ import static org.testng.Assert.assertFalse; import static org.mockito.Mockito.mock; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -114,6 +116,63 @@ public void testGetReadonlyBookies(String provider, Supplier urlSupplier assertEquals(result.getValue().size(), addresses.size()); } + @Test(dataProvider = "impl") + public void testGetBookieServiceInfo(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = + MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); + + @Cleanup + RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class)); + + @Cleanup + RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot); + + List addresses = new ArrayList<>(prepareNBookies(10)); + List bookieServiceInfos = new ArrayList<>(); + int port = 223; + for (BookieId address : addresses) { + BookieServiceInfo info = new BookieServiceInfo(); + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setAuth(Collections.emptyList()); + endpoint.setExtensions(Collections.emptyList()); + endpoint.setId("id"); + endpoint.setHost("localhost"); + endpoint.setPort(port++); + endpoint.setProtocol("bookie-rpc"); + info.setEndpoints(Arrays.asList(endpoint)); + bookieServiceInfos.add(info); + // some readonly, some writable + boolean readOnly = port % 2 == 0; + rm.registerBookie(address, readOnly, info); + } + + int i = 0; + for (BookieId address : addresses) { + BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); + compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++)); + } + + } + + private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) { + assertEquals(a.getProperties(), b.getProperties()); + assertEquals(a.getEndpoints().size(), b.getEndpoints().size()); + for (int i = 0; i < a.getEndpoints().size(); i++) { + BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i); + BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i); + assertEquals(e1.getHost(), e2.getHost()); + assertEquals(e1.getPort(), e2.getPort()); + assertEquals(e1.getId(), e2.getId()); + assertEquals(e1.getProtocol(), e2.getProtocol()); + assertEquals(e1.getExtensions(), e2.getExtensions()); + assertEquals(e1.getAuth(), e2.getAuth()); + } + + } + @Test(dataProvider = "impl") public void testGetAllBookies(String provider, Supplier urlSupplier) throws Exception { @Cleanup