Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][functions] Ensure InternalConfigurationData data model is compatible across different versions #17690

Merged
merged 3 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -87,9 +89,11 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
Expand Down Expand Up @@ -205,6 +209,49 @@ public void internalConfiguration() throws Exception {
assertEquals(response, expectedData);
}

@Data
@AllArgsConstructor
/**
* Internal configuration data model before (before https://github.com/apache/pulsar/pull/14384).
*/
private static class OldInternalConfigurationData {
private String zookeeperServers;
private String configurationStoreServers;
@Deprecated
private String ledgersRootPath;
private String bookkeeperMetadataServiceUri;
private String stateStorageServiceUrl;
}

/**
* This test verifies that the model data changes in InternalConfigurationData are retro-compatible.
* InternalConfigurationData is downloaded from the Function worker from a broker.
* The broker may be still serve an "old" version of InternalConfigurationData
* (before https://github.com/apache/pulsar/pull/14384) while the Worker already uses the new one.
* @throws Exception
*/
@Test
public void internalConfigurationRetroCompatibility() throws Exception {
OldInternalConfigurationData oldDataModel = new OldInternalConfigurationData(
MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL(conf.getMetadataStoreUrl()),
conf.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));

final Map<String, Object> oldDataJson = ObjectMapperFactory
.getThreadLocal().convertValue(oldDataModel, Map.class);

final InternalConfigurationData newData = ObjectMapperFactory.getThreadLocal()
.convertValue(oldDataJson, InternalConfigurationData.class);

assertEquals(newData.getMetadataStoreUrl(), conf.getMetadataStoreUrl());
assertEquals(newData.getConfigurationMetadataStoreUrl(), oldDataModel.getConfigurationStoreServers());
assertEquals(newData.getLedgersRootPath(), oldDataModel.getLedgersRootPath());
assertEquals(newData.getBookkeeperMetadataServiceUri(), oldDataModel.getBookkeeperMetadataServiceUri());
assertEquals(newData.getStateStorageServiceUrl(), oldDataModel.getStateStorageServiceUrl());
}

@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
@ToString
public class InternalConfigurationData {

@Deprecated
private String zookeeperServers;
@Deprecated
private String configurationStoreServers;
private String metadataStoreUrl;
private String configurationMetadataStoreUrl;
@Deprecated
Expand All @@ -43,18 +47,48 @@ public InternalConfigurationData(String zookeeperServers,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.metadataStoreUrl = zookeeperServers;
this.zookeeperServers = zookeeperServers;
this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
this.configurationStoreServers = configurationMetadataStoreUrl;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}

@Deprecated
public String getZookeeperServers() {
return zookeeperServers;
}

@Deprecated
public void setZookeeperServers(String zookeeperServers) {
this.zookeeperServers = zookeeperServers;
}

@Deprecated
public String getConfigurationStoreServers() {
return configurationStoreServers;
}

@Deprecated
public void setConfigurationStoreServers(String configurationStoreServers) {
this.configurationStoreServers = configurationStoreServers;
}

public String getMetadataStoreUrl() {
return metadataStoreUrl;
if (metadataStoreUrl != null) {
return metadataStoreUrl;
} else if (zookeeperServers != null) {
return "zk:" + zookeeperServers;
}
return null;
}

public String getConfigurationMetadataStoreUrl() {
return configurationMetadataStoreUrl;
if (configurationMetadataStoreUrl != null) {
return configurationMetadataStoreUrl;
}
return configurationStoreServers;
}

/** @deprecated */
Expand Down