Skip to content

Commit

Permalink
[controller] Add deferred version swap service to parent controller (#…
Browse files Browse the repository at this point in the history
…1421)

* add deferred version swap service to parent controller

* add unit + integration test for deferred version swap service
  • Loading branch information
misyel authored Jan 31, 2025
1 parent 43c280c commit 7e84caf
Show file tree
Hide file tree
Showing 9 changed files with 958 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2398,4 +2398,8 @@ private ConfigKeys() {
* Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT}
*/
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";

public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS = "controller.deferred.version.swap.sleep.ms";
public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED =
"controller.deferred.version.swap.service.enabled";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestDeferredVersionSwap {
private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
private static final int NUMBER_OF_CLUSTERS = 1;
private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
private static final String TARGET_REGION = "dc-0";

private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, 30000);
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, true);
Properties serverProperties = new Properties();

multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
NUMBER_OF_CLUSTERS,
1,
1,
1,
1,
1,
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.of(serverProperties));
}

@AfterClass(alwaysRun = true)
public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
public void testDeferredVersionSwap() throws IOException {
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
// Setup job properties
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("testDeferredVersionSwap");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
String keySchemaStr = "\"string\"";
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
storeParms.setTargetRegionSwap(TARGET_REGION);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, NAME_RECORD_V3_SCHEMA.toString(), props, storeParms).close();

// Start push job with target region push enabled
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in the target region
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(TARGET_REGION)) {
Assert.assertEquals((int) version, 1);
} else {
Assert.assertEquals((int) version, 0);
}
});
});

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreInfo parentStore = parentControllerClient.getStore(storeName).getStore();
Assert.assertEquals(parentStore.getVersion(1).get().getStatus(), VersionStatus.PUSHED);
});

// Version should be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 1);
});
});

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreInfo parentStore = parentControllerClient.getStore(storeName).getStore();
Assert.assertEquals(parentStore.getVersion(1).get().getStatus(), VersionStatus.ONLINE);
});
}
}

}
Loading

0 comments on commit 7e84caf

Please sign in to comment.