Skip to content

Commit

Permalink
Merge branch 'cherrypick-2.9-17562' into 'branch-2.9-wxg-release' (me…
Browse files Browse the repository at this point in the history
…rge request !44)

[fix][broker] Topic policy reader can't recover when get any exception (apache#17562)
  • Loading branch information
druidliu committed Sep 23, 2022
2 parents cdb86b0 + 6180c12 commit 48cfd60
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
Expand Down Expand Up @@ -230,21 +231,20 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
return result;
}

private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
readerCompletableFuture.whenComplete((reader, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
result.completeExceptionally(ex);
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}
readerCompletableFuture.thenAccept(reader -> {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}).exceptionally(ex -> {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
result.completeExceptionally(ex);
return null;
});
}
}
Expand Down Expand Up @@ -346,14 +346,18 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
});
}

private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
.exceptionally(ex -> {
log.warn("[{}] Close change_event reader fail.", namespace, ex);
return null;
});
}
policyCacheInitMap.remove(namespace);
}
Expand Down

0 comments on commit 48cfd60

Please sign in to comment.