Skip to content

Commit

Permalink
[improve] Refactored BK ClientFactory to return futures (apache#22853)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 6, 2024
1 parent 4341f0f commit d74010c
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 191 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();

final CountDownLatch mlMetaCounter = new CountDownLatch(1);

store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
Expand Down Expand Up @@ -180,12 +180,16 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat)
if (log.isDebugEnabled()) {
log.debug("[{}] Opening ledger {}", managedLedgerName, id);
}
try {
bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null);
} catch (Exception e) {
log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, e);
mlMetaCounter.countDown();
}

factory.getBookKeeper()
.thenAccept(bk -> {
bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, ex);
opencb.openComplete(-1, null, null);
mlMetaCounter.countDown();
return null;
});
} else {
log.warn("[{}] Ledger list empty", managedLedgerName);
mlMetaCounter.countDown();
Expand Down Expand Up @@ -217,7 +221,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
BookKeeper bk = factory.getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.broker;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -31,13 +31,16 @@
* Provider of a new BookKeeper client instance.
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> policyClass,
Map<String, Object> ensemblePlacementPolicyProperties);

CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> policyClass,
Map<String, Object> ensemblePlacementPolicyProperties,
StatsLogger statsLogger);

BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties,
StatsLogger statsLogger) throws IOException;
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -53,19 +54,19 @@
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) throws IOException {
return create(conf, store, eventLoopGroup, ensemblePlacementPolicyClass, properties,
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> policyClass,
Map<String, Object> properties) {
return create(conf, store, eventLoopGroup, policyClass, properties,
NullStatsLogger.INSTANCE);
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
Map<String, Object> properties, StatsLogger statsLogger) {
PulsarMetadataClientDriver.init();

ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
Expand All @@ -77,11 +78,14 @@ public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
} else {
setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
}
try {
return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
}

return CompletableFuture.supplyAsync(() -> {
try {
return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build();
} catch (InterruptedException | BKException | IOException e) {
throw new RuntimeException(e);
}
});
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
Expand All @@ -48,8 +50,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {

private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkEnsemblePolicyToBkClientMap = new ConcurrentHashMap<>();
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
private StatsProvider statsProvider = new NullStatsProvider();

public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
Expand Down Expand Up @@ -89,27 +91,20 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client");

this.defaultBkClient =
bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, Optional.empty(), null, statsLogger);
bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, Optional.empty(), null, statsLogger)
.get();

BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
BookKeeper bkClient = null;
// find or create bk-client in cache for a specific ensemblePlacementPolicy
if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> {
try {
return bookkeeperProvider.create(conf, metadataStore, eventLoopGroup,
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
ensemblePlacementPolicyConfig.getProperties(), statsLogger);
} catch (Exception e) {
log.error("Failed to initialize bk-client for policy {}, properties {}",
ensemblePlacementPolicyConfig.getPolicyClass(),
ensemblePlacementPolicyConfig.getProperties(), e);
}
return this.defaultBkClient;
});
if (ensemblePlacementPolicyConfig == null || ensemblePlacementPolicyConfig.getPolicyClass() == null) {
return CompletableFuture.completedFuture(defaultBkClient);
}
return bkClient != null ? bkClient : defaultBkClient;

// find or create bk-client in cache for a specific ensemblePlacementPolicy
return bkEnsemblePolicyToBkClientMap.get(ensemblePlacementPolicyConfig,
(config, executor) -> bookkeeperProvider.create(conf, metadataStore, eventLoopGroup,
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
ensemblePlacementPolicyConfig.getProperties(), statsLogger));
};

try {
Expand All @@ -136,7 +131,7 @@ public StatsProvider getStatsProvider() {

@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
return bkEnsemblePolicyToBkClientMap;
return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
}

@Override
Expand Down Expand Up @@ -164,7 +159,7 @@ public void close() throws IOException {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper client", ree);
}
bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
bkEnsemblePolicyToBkClientMap.synchronous().asMap().forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void start() throws Exception {
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
);
).get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void start() throws IOException {
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
);
).join();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static void main(String[] args) throws Exception {
new DefaultThreadFactory("compactor-io"));

@Cleanup
BookKeeper bk = bkClientFactory.create(brokerConfig, store, eventLoopGroup, Optional.empty(), null);
BookKeeper bk = bkClientFactory.create(brokerConfig, store, eventLoopGroup, Optional.empty(), null).get();

@Cleanup
PulsarClient pulsar = createClient(brokerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.broker;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
Expand Down Expand Up @@ -51,19 +51,19 @@ public MockedBookKeeperClientFactory() {
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) throws IOException {
return mockedBk;
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) {
return CompletableFuture.completedFuture(mockedBk);
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
return mockedBk;
Map<String, Object> properties, StatsLogger statsLogger) {
return CompletableFuture.completedFuture(mockedBk);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.EventLoopGroup;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -39,21 +40,21 @@ class MockBookKeeperClientFactory implements BookKeeperClientFactory {
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) {
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) {
// Always return the same instance (so that we don't loose the mock BK content on broker restart
return mockBookKeeper;
return CompletableFuture.completedFuture(mockBookKeeper);
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger statsLogger) {
// Always return the same instance (so that we don't loose the mock BK content on broker restart
return mockBookKeeper;
return CompletableFuture.completedFuture(mockBookKeeper);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void cleanup() throws Exception {
public void testEntryLookup() throws Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
this.conf, null, null, Optional.empty(), null).get();

Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData
= buildCompactedLedger(bk, 500);
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testEntryLookup() throws Exception {
public void testCleanupOldCompactedTopicLedger() throws Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
this.conf, null, null, Optional.empty(), null).get();

LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Expand Down Expand Up @@ -849,7 +849,7 @@ public void testReadCompactedLatestMessageWithInclusive() throws Exception {
public void testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
this.conf, null, null, Optional.empty(), null).get();

Mockito.doAnswer(invocation -> {
Thread.sleep(1500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void setup() throws Exception {

compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void setup() throws Exception {

compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void setup() throws Exception {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
this.conf, null, null, Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}

Expand Down
Loading

0 comments on commit d74010c

Please sign in to comment.