From f31c5de71ce193de3cfa837f99e1a584824a6669 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 26 Nov 2024 20:46:24 +0800 Subject: [PATCH] [fix] [broker] fix null lookup result when brokers are starting --- .../org/apache/pulsar/broker/PulsarService.java | 6 +++--- .../pulsar/broker/lookup/TopicLookupBase.java | 17 +++++++++++++++++ .../pulsar/broker/namespace/OwnershipCache.java | 8 +++++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 05491d9c281c6..a8331d86fc441 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -979,6 +979,9 @@ public void start() throws PulsarServerException { this.webSocketService.setLocalCluster(clusterData); } + // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. + this.nsService.initialize(); + // Start the leader election service startLeaderElectionService(); @@ -990,9 +993,6 @@ public void start() throws PulsarServerException { // (namespace service depends on load manager) this.startLoadManagementService(); - // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. - this.nsService.initialize(); - // Start topic level policies service this.topicPoliciesService = initTopicPoliciesService(); this.topicPoliciesService.start(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 42f145d32aab1..41992cc2da418 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -318,6 +318,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe } LookupData lookupData = lookupResult.get().getLookupData(); + printWarnLogIfLookupResUnexpected(topicName, lookupData, pulsarService); if (lookupResult.get().isRedirect()) { boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect(); lookupfuture.complete( @@ -342,6 +343,22 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe return lookupfuture; } + /** + * Check if a internal client will get a null lookup result. + */ + private static void printWarnLogIfLookupResUnexpected(TopicName topic, LookupData lookupData, + PulsarService pulsar) { + if (!pulsar.getBrokerService().isSystemTopic(topic)) { + return; + } + boolean tlsEnabled = pulsar.getConfig().isBrokerClientTlsEnabled(); + if (!tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrl())) { + log.warn("[{}] Unexpected lookup result {}", topic.toString(), lookupData); + } else if (tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrlTls())) { + log.warn("[{}] Unexpected lookup result {}", topic.toString(), lookupData); + } + } + private static void handleLookupError(CompletableFuture lookupFuture, String topicName, String clientAppId, long requestId, Throwable ex){ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 868ed2d9fc2c1..ce68c036a622c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -72,7 +72,7 @@ public class OwnershipCache { /** * The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled. */ - private final NamespaceEphemeralData selfOwnerInfoDisabled; + private NamespaceEphemeralData selfOwnerInfoDisabled; private final LockManager lockManager; @@ -119,6 +119,9 @@ public OwnershipCache(PulsarService pulsar, NamespaceService namespaceService) { this.pulsar = pulsar; this.ownerBrokerUrl = pulsar.getBrokerServiceUrl(); this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls(); + // At this moment, the variables "webServiceAddress" and "webServiceAddressTls" and so on have not been + // initialized, so we will get an empty "selfOwnerInfo" and an empty "selfOwnerInfoDisabled" here. + // But do not worry, these two fields will be set by the method "refreshSelfOwnerInfo" soon. this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); @@ -351,6 +354,9 @@ public synchronized boolean refreshSelfOwnerInfo() { this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); + this.selfOwnerInfoDisabled = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), + pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(), + pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners()); return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null; } }