Skip to content

Commit

Permalink
[fix] [broker] fix null lookup result when brokers are starting
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Nov 26, 2024
1 parent 949750f commit f31c5de
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,9 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

// Start the leader election service
startLeaderElectionService();

Expand All @@ -990,9 +993,6 @@ public void start() throws PulsarServerException {
// (namespace service depends on load manager)
this.startLoadManagementService();

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

// Start topic level policies service
this.topicPoliciesService = initTopicPoliciesService();
this.topicPoliciesService.start(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
}

LookupData lookupData = lookupResult.get().getLookupData();
printWarnLogIfLookupResUnexpected(topicName, lookupData, pulsarService);
if (lookupResult.get().isRedirect()) {
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
lookupfuture.complete(
Expand All @@ -342,6 +343,22 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
return lookupfuture;
}

/**
* Check if a internal client will get a null lookup result.
*/
private static void printWarnLogIfLookupResUnexpected(TopicName topic, LookupData lookupData,
PulsarService pulsar) {
if (!pulsar.getBrokerService().isSystemTopic(topic)) {
return;
}
boolean tlsEnabled = pulsar.getConfig().isBrokerClientTlsEnabled();
if (!tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrl())) {
log.warn("[{}] Unexpected lookup result {}", topic.toString(), lookupData);
} else if (tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrlTls())) {
log.warn("[{}] Unexpected lookup result {}", topic.toString(), lookupData);
}
}

private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class OwnershipCache {
/**
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
*/
private final NamespaceEphemeralData selfOwnerInfoDisabled;
private NamespaceEphemeralData selfOwnerInfoDisabled;

private final LockManager<NamespaceEphemeralData> lockManager;

Expand Down Expand Up @@ -119,6 +119,9 @@ public OwnershipCache(PulsarService pulsar, NamespaceService namespaceService) {
this.pulsar = pulsar;
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
// At this moment, the variables "webServiceAddress" and "webServiceAddressTls" and so on have not been
// initialized, so we will get an empty "selfOwnerInfo" and an empty "selfOwnerInfoDisabled" here.
// But do not worry, these two fields will be set by the method "refreshSelfOwnerInfo" soon.
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
Expand Down Expand Up @@ -351,6 +354,9 @@ public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
}

0 comments on commit f31c5de

Please sign in to comment.