Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Branch-2.10] [fix] [broker] Fix bookeeper packages npe #17291

Merged
merged 2 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

### --- General broker settings --- ###

# Zookeeper quorum connection string
zookeeperServers=
# The metadata store URL
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
metadataStoreUrl=

# Configuration Store connection string
configurationStoreServers=
# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
configurationMetadataStoreUrl=

brokerServicePort=6650

Expand Down Expand Up @@ -1087,3 +1091,11 @@ zooKeeperOperationTimeoutSeconds=-1
# ZooKeeper cache expiry time in seconds
# Deprecated: use metadataStoreCacheExpirySeconds
zooKeeperCacheExpirySeconds=-1

# Zookeeper quorum connection string
# Deprecated: use metadataStoreUrl
zookeeperServers=

# Configuration Store connection string
# Deprecated: use configurationMetadataStoreUrl
configurationStoreServers=
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;

public final class PulsarStandaloneBuilder {

Expand Down Expand Up @@ -114,8 +115,10 @@ public PulsarStandalone build() {
}

// Set ZK server's host to localhost
pulsarStandalone.getConfig().setZookeeperServers(zkServers + ":" + pulsarStandalone.getZkPort());
pulsarStandalone.getConfig().setConfigurationStoreServers(zkServers + ":" + pulsarStandalone.getZkPort());
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setRunningStandalone(true);
return pulsarStandalone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,8 +101,11 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
config.setZookeeperServers(zkServers + ":" + this.getZkPort());
config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());

final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
config.setMetadataStoreUrl(metadataStoreUrl);
config.setConfigurationMetadataStoreUrl(metadataStoreUrl);

config.setRunningStandalone(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@ToString
public class InternalConfigurationData {

private String zookeeperServers;
private String configurationStoreServers;
private String metadataStoreUrl;
private String configurationMetadataStoreUrl;
@Deprecated
private String ledgersRootPath;
private String bookkeeperMetadataServiceUri;
Expand All @@ -38,23 +38,23 @@ public InternalConfigurationData() {
}

public InternalConfigurationData(String zookeeperServers,
String configurationStoreServers,
String configurationMetadataStoreUrl,
String ledgersRootPath,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.zookeeperServers = zookeeperServers;
this.configurationStoreServers = configurationStoreServers;
this.metadataStoreUrl = zookeeperServers;
this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}

public String getZookeeperServers() {
return zookeeperServers;
public String getMetadataStoreUrl() {
return metadataStoreUrl;
}

public String getConfigurationStoreServers() {
return configurationStoreServers;
public String getConfigurationMetadataStoreUrl() {
return configurationMetadataStoreUrl;
}

/** @deprecated */
Expand All @@ -77,17 +77,17 @@ public boolean equals(Object obj) {
return false;
}
InternalConfigurationData other = (InternalConfigurationData) obj;
return Objects.equals(zookeeperServers, other.zookeeperServers)
&& Objects.equals(configurationStoreServers, other.configurationStoreServers)
return Objects.equals(metadataStoreUrl, other.metadataStoreUrl)
&& Objects.equals(configurationMetadataStoreUrl, other.configurationMetadataStoreUrl)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
}

@Override
public int hashCode() {
return Objects.hash(zookeeperServers,
configurationStoreServers,
return Objects.hash(metadataStoreUrl,
configurationMetadataStoreUrl,
ledgersRootPath,
bookkeeperMetadataServiceUri,
stateStorageServiceUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -274,13 +275,14 @@ private static URI initializeStandaloneWorkerService(PulsarClientCreator clientC
URI dlogURI;
try {
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing "
+ "function packages", internalConf.getZookeeperServers(),
+ "function packages", internalConf.getMetadataStoreUrl(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
Expand Down Expand Up @@ -356,15 +358,16 @@ public void initInBroker(ServiceConfiguration brokerConfig,
URI dlogURI;
try {
// initializing dlog namespace for function worker
if (workerConfig.isInitializedDlogMetadata()){
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
if (workerConfig.isInitializedDlogMetadata()) {
String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for "
+ "storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
internalConf.getMetadataStoreUrl(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
// for BC purposes
if (internalConf.getBookkeeperMetadataServiceUri() == null) {
ledgersRootPath = internalConf.getLedgersRootPath();
ledgersStoreServers = internalConf.getZookeeperServers();
ledgersStoreServers = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
chrootPath = "";
} else {
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
public class BookKeeperPackagesStorage implements PackagesStorage {

private static final String NS_CLIENT_ID = "packages-management";
public static final String ZK_SCHEME_IDENTIFIER = "zk:";
final BookKeeperPackagesStorageConfiguration configuration;
private Namespace namespace;

Expand Down Expand Up @@ -100,12 +101,18 @@ private URI initializeDlogNamespace() throws IOException {
ledgersRootPath = metadataServiceUri.getPath();
} else {
ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
ledgersStoreServers = configuration.getZookeeperServers();
if (StringUtils.isNotBlank(configuration.getMetadataStoreUrl())) {
ledgersStoreServers = configuration.getMetadataStoreUrl();
if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
}
} else {
ledgersStoreServers = configuration.getZookeeperServers();
}
}
BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages",
configuration.getZookeeperServers()));
URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
try {
dlMetadata.create(dlogURI);
} catch (ZKException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ String getZookeeperServers() {
return getProperty("zookeeperServers");
}

String getMetadataStoreUrl() {
return getProperty("metadataStoreUrl");
}

String getPackagesManagementLedgerRootPath() {
return getProperty("packagesManagementLedgerRootPath");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void setup() throws Exception {
PackagesStorageProvider provider = PackagesStorageProvider
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
storage = provider.getStorage(configuration);
Expand All @@ -68,7 +68,7 @@ public void teardown() throws Exception {
public void testConfiguration() {
assertTrue(storage instanceof BookKeeperPackagesStorage);
BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getMetadataStoreUrl(), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getPackagesReplicas(), 1);
assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers");
}
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testReadWriteOperationsWithSeparatedBkCluster() throws Exception {
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
// set the unavailable bk cluster with mock zookeeper path
configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString() + "/mock");
configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString() + "/mock");
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
PackagesStorage storage1 = provider.getStorage(configuration);
Expand All @@ -221,7 +221,7 @@ public void testReadWriteOperationsWithSeparatedBkCluster() throws Exception {
// set the available bk cluster with bookkeeperMetadataServiceUri using actual zookeeper path
String bookkeeperMetadataServiceUri = String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
DefaultPackagesStorageConfiguration configuration2 = new DefaultPackagesStorageConfiguration();
configuration2.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration2.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
configuration2.setProperty("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri);
configuration2.setProperty("packagesReplicas", "1");
PackagesStorage storage2 = provider.getStorage(configuration2);
Expand Down