Skip to content

Commit

Permalink
fix unit test error
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Feb 25, 2022
1 parent 3c4dcc2 commit 7617a36
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -69,7 +70,6 @@
import org.apache.pulsar.functions.worker.service.api.Sources;
import org.apache.pulsar.functions.worker.service.api.Workers;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -273,8 +273,11 @@ private static URI initializeStandaloneWorkerService(PulsarClientCreator clientC
URI dlogURI;
try {
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getMetadataStoreUrl()
.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()));
String metadataStoreUrl = internalConf.getMetadataStoreUrl();
if (metadataStoreUrl.startsWith(ZK_SCHEME_IDENTIFIER)) {
metadataStoreUrl = metadataStoreUrl.substring(ZK_SCHEME_IDENTIFIER.length());
}
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
Expand Down Expand Up @@ -357,9 +360,11 @@ public void initInBroker(ServiceConfiguration brokerConfig,
try {
// initializing dlog namespace for function worker
if (workerConfig.isInitializedDlogMetadata()){
dlogURI =
WorkerUtils.newDlogNamespaceURI(internalConf.getMetadataStoreUrl()
.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()));
String metadataStoreUrl = internalConf.getMetadataStoreUrl();
if (metadataStoreUrl.startsWith(ZK_SCHEME_IDENTIFIER)) {
metadataStoreUrl = metadataStoreUrl.substring(ZK_SCHEME_IDENTIFIER.length());
}
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -175,8 +176,11 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
// for BC purposes
if (internalConf.getBookkeeperMetadataServiceUri() == null) {
ledgersRootPath = internalConf.getLedgersRootPath();
ledgersStoreServers = internalConf.getMetadataStoreUrl()
.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length());
String metadataStoreUrl = internalConf.getMetadataStoreUrl();
if (metadataStoreUrl.startsWith(ZK_SCHEME_IDENTIFIER)) {
metadataStoreUrl = metadataStoreUrl.substring(ZK_SCHEME_IDENTIFIER.length());
}
ledgersStoreServers = metadataStoreUrl;
chrootPath = "";
} else {
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
public class BookKeeperPackagesStorage implements PackagesStorage {

private static final String NS_CLIENT_ID = "packages-management";
public static final String ZK_SCHEME_IDENTIFIER = "zk:";
final BookKeeperPackagesStorageConfiguration configuration;
private Namespace namespace;

Expand Down Expand Up @@ -92,12 +93,14 @@ private URI initializeDlogNamespace() throws IOException {
ledgersRootPath = metadataServiceUri.getPath();
} else {
ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
ledgersStoreServers = configuration.getZookeeperServers();
ledgersStoreServers = configuration.getMetadataStoreUrl();
if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
}
}
BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages",
configuration.getZookeeperServers()));
URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
try {
dlMetadata.create(dlogURI);
} catch (ZKException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ int getPackagesReplicas() {
return Integer.parseInt(getProperty("packagesReplicas"));
}

String getZookeeperServers() {
return getProperty("zookeeperServers");
String getMetadataStoreUrl() {
return getProperty("metadataStoreUrl");
}

String getPackagesManagementLedgerRootPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.packages.management.storage.bookkeeper;

import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void setup() throws Exception {
PackagesStorageProvider provider = PackagesStorageProvider
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration.setProperty("metadataStoreUrl", ZK_SCHEME_IDENTIFIER + zkUtil.getZooKeeperConnectString());
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
storage = provider.getStorage(configuration);
Expand All @@ -68,7 +69,8 @@ public void teardown() throws Exception {
public void testConfiguration() {
assertTrue(storage instanceof BookKeeperPackagesStorage);
BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getMetadataStoreUrl()
.substring(ZK_SCHEME_IDENTIFIER.length()), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getPackagesReplicas(), 1);
assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers");
}
Expand Down Expand Up @@ -198,7 +200,8 @@ public void testReadWriteOperationsWithSeparatedBkCluster() throws Exception {
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
// set the unavailable bk cluster with mock zookeeper path
configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString() + "/mock");
configuration.setProperty("metadataStoreUrl", ZK_SCHEME_IDENTIFIER + zkUtil.getZooKeeperConnectString() +
"/mock");
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
PackagesStorage storage1 = provider.getStorage(configuration);
Expand All @@ -221,7 +224,7 @@ public void testReadWriteOperationsWithSeparatedBkCluster() throws Exception {
// set the available bk cluster with bookkeeperMetadataServiceUri using actual zookeeper path
String bookkeeperMetadataServiceUri = String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
DefaultPackagesStorageConfiguration configuration2 = new DefaultPackagesStorageConfiguration();
configuration2.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration2.setProperty("metadataStoreUrl", ZK_SCHEME_IDENTIFIER + zkUtil.getZooKeeperConnectString());
configuration2.setProperty("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri);
configuration2.setProperty("packagesReplicas", "1");
PackagesStorage storage2 = provider.getStorage(configuration2);
Expand Down

0 comments on commit 7617a36

Please sign in to comment.