diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 8aacd4e5daa..145e73c5c44 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2398,4 +2398,8 @@ private ConfigKeys() { * Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT} */ public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count"; + + public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS = "controller.deferred.version.swap.sleep.ms"; + public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED = + "controller.deferred.version.swap.service.enabled"; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java new file mode 100644 index 00000000000..b71eafc8f8e --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java @@ -0,0 +1,131 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; +import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; + +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Utils; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestDeferredVersionSwap { + private static final int NUMBER_OF_CHILD_DATACENTERS = 2; + private static final int NUMBER_OF_CLUSTERS = 1; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private static final String TARGET_REGION = "dc-0"; + + private static final String[] CLUSTER_NAMES = + IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + + @BeforeClass + public void setUp() { + Properties controllerProps = new Properties(); + controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, 30000); + controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, true); + Properties serverProperties = new Properties(); + + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + NUMBER_OF_CHILD_DATACENTERS, + NUMBER_OF_CLUSTERS, + 1, + 1, + 1, + 1, + 1, + Optional.of(controllerProps), + Optional.of(controllerProps), + Optional.of(serverProperties)); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); + } + + @Test + public void testDeferredVersionSwap() throws IOException { + File inputDir = getTempDataDirectory(); + TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100); + // Setup job properties + String inputDirPath = "file://" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("testDeferredVersionSwap"); + Properties props = + IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName); + String keySchemaStr = "\"string\""; + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true); + storeParms.setTargetRegionSwapWaitTime(1); + storeParms.setTargetRegionSwap(TARGET_REGION); + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) { + createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, NAME_RECORD_V3_SCHEMA.toString(), props, storeParms).close(); + + // Start push job with target region push enabled + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + TestWriteUtils.runPushJob("Test push job", props); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 1), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + // Version should only be swapped in the target region + TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + if (colo.equals(TARGET_REGION)) { + Assert.assertEquals((int) version, 1); + } else { + Assert.assertEquals((int) version, 0); + } + }); + }); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + StoreInfo parentStore = parentControllerClient.getStore(storeName).getStore(); + Assert.assertEquals(parentStore.getVersion(1).get().getStatus(), VersionStatus.PUSHED); + }); + + // Version should be swapped in all regions + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + Assert.assertEquals((int) version, 1); + }); + }); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + StoreInfo parentStore = parentControllerClient.getStore(storeName).getStore(); + Assert.assertEquals(parentStore.getVersion(1).get().getStatus(), VersionStatus.ONLINE); + }); + } + } + +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java new file mode 100644 index 00000000000..427c68ab24e --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java @@ -0,0 +1,307 @@ +package com.linkedin.venice.controller; + +import static com.linkedin.venice.meta.VersionStatus.ERROR; +import static com.linkedin.venice.meta.VersionStatus.ONLINE; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.linkedin.venice.controller.stats.DeferredVersionSwapStats; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.service.AbstractVeniceService; +import com.linkedin.venice.utils.RegionUtils; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This service is in charge of swapping to a new version after a specified wait time in the remaining regions of a target region push if enabled. + * The wait time is specified through a store/version level config (target_swap_region_wait_time) and the default wait time is 60m. + */ +public class DeferredVersionSwapService extends AbstractVeniceService { + private final AtomicBoolean stop = new AtomicBoolean(false); + private final Set allClusters; + private final VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig; + private final VeniceParentHelixAdmin veniceParentHelixAdmin; + private final ScheduledExecutorService deferredVersionSwapExecutor = Executors.newSingleThreadScheduledExecutor(); + private final DeferredVersionSwapStats deferredVersionSwapStats; + private static final Logger LOGGER = LogManager.getLogger(DeferredVersionSwapService.class); + private Cache> storePushCompletionTimeCache = + Caffeine.newBuilder().expireAfterWrite(2, TimeUnit.HOURS).build(); + + public DeferredVersionSwapService( + VeniceParentHelixAdmin admin, + VeniceControllerMultiClusterConfig multiClusterConfig, + DeferredVersionSwapStats deferredVersionSwapStats) { + this.veniceParentHelixAdmin = admin; + this.allClusters = multiClusterConfig.getClusters(); + this.veniceControllerMultiClusterConfig = multiClusterConfig; + this.deferredVersionSwapStats = deferredVersionSwapStats; + } + + @Override + public boolean startInner() throws Exception { + deferredVersionSwapExecutor.scheduleAtFixedRate( + new DeferredVersionSwapTask(), + 0, + veniceControllerMultiClusterConfig.getDeferredVersionSwapSleepMs(), + TimeUnit.MILLISECONDS); + return true; + } + + @Override + public void stopInner() throws Exception { + stop.set(true); + deferredVersionSwapExecutor.shutdown(); + } + + private Set getRegionsForVersionSwap(Map candidateRegions, Set targetRegions) { + Set remainingRegions = new HashSet<>(candidateRegions.keySet()); + remainingRegions.removeAll(targetRegions); + return remainingRegions; + } + + private String getTargetRegion(Set targetRegions) { + return targetRegions.iterator().next(); + } + + private StoreResponse getStoreForRegion(String clusterName, String targetRegion, String storeName) { + Map controllerClientMap = + veniceParentHelixAdmin.getVeniceHelixAdmin().getControllerClientMap(clusterName); + ControllerClient targetRegionControllerClient = controllerClientMap.get(targetRegion); + return targetRegionControllerClient.getStore(storeName); + } + + private boolean didWaitTimeElapseInTargetRegions( + Map completionTimes, + Set targetRegions, + int waitTime) { + boolean didWaitTimeElapseInTargetRegions = true; + for (String targetRegion: targetRegions) { + long completionTime = completionTimes.get(targetRegion); + long storeWaitTime = TimeUnit.MINUTES.toSeconds(waitTime); + long currentTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC); + if ((completionTime + storeWaitTime) > currentTime) { + didWaitTimeElapseInTargetRegions = false; + } + } + + return didWaitTimeElapseInTargetRegions; + } + + private class DeferredVersionSwapTask implements Runnable { + @Override + public void run() { + while (!stop.get()) { + try { + for (String cluster: allClusters) { + if (!veniceParentHelixAdmin.isLeaderControllerFor(cluster)) { + continue; + } + + List stores = veniceParentHelixAdmin.getAllStores(cluster); + for (Store store: stores) { + if (StringUtils.isEmpty(store.getTargetSwapRegion())) { + continue; + } + + int targetVersionNum = store.getLargestUsedVersionNumber(); + Version targetVersion = store.getVersion(targetVersionNum); + if (targetVersion == null) { + continue; + } + + // The store is eligible for a version swap if its push job is in terminal status. For a target region + // push, the parent version status is set to PUSHED in getOfflinePushStatus when this happens + if (targetVersion.getStatus() != VersionStatus.PUSHED) { + continue; + } + + // If we have a cached push completion for this store, check that the waitTime has elapsed before + // proceeding further + String storeName = store.getName(); + String kafkaTopicName = Version.composeKafkaTopic(storeName, targetVersionNum); + Set targetRegions = RegionUtils.parseRegionsFilterList(store.getTargetSwapRegion()); + Map storePushCompletionTimes = storePushCompletionTimeCache.getIfPresent(kafkaTopicName); + if (storePushCompletionTimes != null) { + if (!didWaitTimeElapseInTargetRegions( + storePushCompletionTimes, + targetRegions, + store.getTargetSwapRegionWaitTime())) { + LOGGER.info( + "Skipping version swap for store: {} on version: {} as wait time: {} has not passed", + storeName, + targetVersionNum, + store.getTargetSwapRegionWaitTime()); + continue; + } + } + + Map coloToVersions = + veniceParentHelixAdmin.getCurrentVersionsForMultiColos(cluster, storeName); + Set remainingRegions = getRegionsForVersionSwap(coloToVersions, targetRegions); + + StoreResponse targetRegionStoreResponse = + getStoreForRegion(cluster, getTargetRegion(targetRegions), storeName); + if (targetRegionStoreResponse.isError()) { + LOGGER.warn("Got error when fetching targetRegionStore: {}", targetRegionStoreResponse.getError()); + continue; + } + + StoreInfo targetRegionStore = targetRegionStoreResponse.getStore(); + Optional version = targetRegionStore.getVersion(targetVersionNum); + if (!version.isPresent()) { + LOGGER.warn( + "Unable to find version {} for store: {} in regions: {}", + targetVersionNum, + storeName, + store.getTargetSwapRegion()); + continue; + } + + // Do not perform version swap for davinci stores + // TODO remove this check once DVC delayed ingestion is completed + if (version.get().getIsDavinciHeartbeatReported()) { + LOGGER.info( + "Skipping version swap for store: {} on version: {} as it is davinci", + storeName, + targetVersionNum); + continue; + } + + // Check that push is completed in target regions + Admin.OfflinePushStatusInfo pushStatusInfo = + veniceParentHelixAdmin.getOffLinePushStatus(cluster, kafkaTopicName); + Set targetRegionsCompleted = new HashSet<>(); + for (String targetRegion: targetRegions) { + String executionStatus = pushStatusInfo.getExtraInfo().get(targetRegion); + if (executionStatus.equals(ExecutionStatus.COMPLETED.toString())) { + targetRegionsCompleted.add(targetRegion); + LOGGER.warn( + "Skipping version swap for store: {} on version: {} as push is not complete in target region {}", + storeName, + targetVersionNum, + targetRegion); + } + } + + if (targetRegionsCompleted.size() < targetRegions.size() / 2) { + LOGGER.warn( + "Skipping version swap for store: {} on version: {} as push is complete in the majority of target regions." + + "Completed target regions: {}, target regions: {}", + storeName, + targetVersionNum, + targetRegionsCompleted, + targetRegions); + continue; + } + + // Check that push is complete in non target regions + int numNonTargetRegionsFailed = 0; + Set nonTargetRegionsCompleted = new HashSet<>(); + for (String remainingRegion: remainingRegions) { + String executionStatus = pushStatusInfo.getExtraInfo().get(remainingRegion); + if (executionStatus.equals(ExecutionStatus.ERROR.toString())) { + numNonTargetRegionsFailed += 1; + LOGGER.warn( + "Push has error status for store: {} on version: {} in a non target region: {}", + storeName, + targetVersionNum, + remainingRegion); + } else if (executionStatus.equals(ExecutionStatus.COMPLETED.toString())) { + nonTargetRegionsCompleted.add(remainingRegion); + } + } + + // If the majority of the remaining regions have failed their push jobs, mark the version status as ERROR + // so that we don't check this store again for this version + HelixVeniceClusterResources resources = + veniceParentHelixAdmin.getVeniceHelixAdmin().getHelixVeniceClusterResources(cluster); + ReadWriteStoreRepository repository = resources.getStoreMetadataRepository(); + if (numNonTargetRegionsFailed > remainingRegions.size() / 2) { + LOGGER.warn( + "Skipping version swap for store: {} on version: {} as majority of non target regions have failed", + storeName, + targetVersionNum); + store.updateVersionStatus(targetVersionNum, ERROR); + repository.updateStore(store); + continue; + } + + // Do not perform a version swap if: + // 1. The majority of the remaining regions have not completed their push yet + // 2. Any of the remaining regions have yet to reach a terminal status: COMPLETED or ERRORas we need to + // wait for all of the + // remaining regions to be completed to account for cases where we have 3 remaining regions and 2 + // COMPLETED, but 1 is STARTED + int nonTargetRegionsInTerminalStatus = nonTargetRegionsCompleted.size() + numNonTargetRegionsFailed; + if (nonTargetRegionsCompleted.size() < remainingRegions.size() / 2 + || nonTargetRegionsInTerminalStatus != remainingRegions.size()) { + LOGGER.info( + "Skipping version swap for store: {} on version: {} as majority of non target regions have not completed their push", + storeName, + targetVersionNum); + continue; + } + + // Check that waitTime has elapsed in target regions + boolean didWaitTimeElapseInTargetRegions = didWaitTimeElapseInTargetRegions( + pushStatusInfo.getExtraInfoUpdateTimestamp(), + targetRegions, + store.getTargetSwapRegionWaitTime()); + + if (!didWaitTimeElapseInTargetRegions) { + LOGGER.info( + "Skipping version swap for store: {} on version: {} as wait time: {} has not passed", + storeName, + targetVersionNum, + store.getTargetSwapRegionWaitTime()); + storePushCompletionTimeCache.put(kafkaTopicName, pushStatusInfo.getExtraInfoUpdateTimestamp()); + continue; + } + + // TODO add call for postStoreVersionSwap() once it is implemented + + String regionsToRollForward = String.join(",\\s*", nonTargetRegionsCompleted); + LOGGER.info("Issuing roll forward message for store: {} in regions: {}", storeName, regionsToRollForward); + veniceParentHelixAdmin.rollForwardToFutureVersion(cluster, storeName, regionsToRollForward); + + // Once version is swapped in the remaining regions, update parent status to ONLINE so that we don't check + // this version for version swap again + store.updateVersionStatus(targetVersionNum, ONLINE); + repository.updateStore(store); + LOGGER.info( + "Updated parent version status to online for version: {} in store: {}", + targetVersionNum, + storeName); + } + } + } catch (Exception e) { + LOGGER.warn("Caught exception: {} while performing deferred version swap", e.toString()); + deferredVersionSwapStats.recordDeferredVersionSwapErrorSensor(); + } catch (Throwable throwable) { + LOGGER.warn("Caught a throwable: {} while performing deferred version swap", throwable.getMessage()); + deferredVersionSwapStats.recordDeferreredVersionSwapThrowableSensor(); + } + } + } + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java index 3ccb0d1dc64..16cc1400deb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java @@ -18,6 +18,7 @@ import com.linkedin.venice.controller.server.AdminSparkServer; import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl; import com.linkedin.venice.controller.server.VeniceControllerRequestHandler; +import com.linkedin.venice.controller.stats.DeferredVersionSwapStats; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator; import com.linkedin.venice.controller.systemstore.SystemStoreRepairService; @@ -74,6 +75,8 @@ public class VeniceController { private final Optional storeGraveyardCleanupService; private final Optional systemStoreRepairService; + private Optional deferredVersionSwapService; + private VeniceControllerRequestHandler secureRequestHandler; private VeniceControllerRequestHandler unsecureRequestHandler; private ThreadPoolExecutor grpcExecutor = null; @@ -164,6 +167,7 @@ public VeniceController(VeniceControllerContext ctx) { this.unusedValueSchemaCleanupService = createUnusedValueSchemaCleanupService(); this.storeGraveyardCleanupService = createStoreGraveyardCleanupService(); this.systemStoreRepairService = createSystemStoreRepairService(); + this.deferredVersionSwapService = createDeferredVersionSwapService(); if (multiClusterConfigs.isGrpcServerEnabled()) { initializeGrpcServer(); } @@ -277,6 +281,18 @@ private Optional createUnusedValueSchemaCleanup return Optional.empty(); } + private Optional createDeferredVersionSwapService() { + if (multiClusterConfigs.isParent() && multiClusterConfigs.isDeferredVersionSwapServiceEnabled()) { + Admin admin = controllerService.getVeniceHelixAdmin(); + return Optional.of( + new DeferredVersionSwapService( + (VeniceParentHelixAdmin) admin, + multiClusterConfigs, + new DeferredVersionSwapStats(metricsRepository))); + } + return Optional.empty(); + } + // package-private for testing private void initializeGrpcServer() { LOGGER.info("Initializing gRPC server as it is enabled for the controller..."); @@ -371,6 +387,7 @@ public void start() { unusedValueSchemaCleanupService.ifPresent(AbstractVeniceService::start); systemStoreRepairService.ifPresent(AbstractVeniceService::start); disabledPartitionEnablerService.ifPresent(AbstractVeniceService::start); + deferredVersionSwapService.ifPresent(AbstractVeniceService::start); // register with service discovery at the end asyncRetryingServiceDiscoveryAnnouncer.register(); if (adminGrpcServer != null) { @@ -435,6 +452,7 @@ public void stop() { unusedValueSchemaCleanupService.ifPresent(Utils::closeQuietlyWithErrorLogged); storeBackupVersionCleanupService.ifPresent(Utils::closeQuietlyWithErrorLogged); disabledPartitionEnablerService.ifPresent(Utils::closeQuietlyWithErrorLogged); + deferredVersionSwapService.ifPresent(Utils::closeQuietlyWithErrorLogged); if (adminGrpcServer != null) { adminGrpcServer.stop(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index ca516b90fcd..16c044253e0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -42,6 +42,8 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_OCCURRENCE_THRESHOLD_FOR_CLEANUP; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFAULT_READ_QUOTA_PER_ROUTER; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_ROUTES; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLE_PARENT_REQUEST_TOPIC_FOR_STREAM_PUSHES; @@ -544,6 +546,8 @@ public class VeniceControllerClusterConfig { private Set pushJobUserErrorCheckpoints; private boolean isHybridStorePartitionCountUpdateEnabled; + private final long deferredVersionSwapSleepMs; + private final boolean deferredVersionSwapServiceEnabled; public VeniceControllerClusterConfig(VeniceProperties props) { this.props = props; @@ -996,6 +1000,9 @@ public VeniceControllerClusterConfig(VeniceProperties props) { this.pushJobUserErrorCheckpoints = parsePushJobUserErrorCheckpoints(props); this.isHybridStorePartitionCountUpdateEnabled = props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false); + this.deferredVersionSwapSleepMs = + props.getLong(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, TimeUnit.MINUTES.toMillis(1)); + this.deferredVersionSwapServiceEnabled = props.getBoolean(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, false); } public VeniceProperties getProps() { @@ -1506,6 +1513,14 @@ public int getGrpcServerThreadCount() { return grpcServerThreadCount; } + public long getDeferredVersionSwapSleepMs() { + return deferredVersionSwapSleepMs; + } + + public boolean isDeferredVersionSwapServiceEnabled() { + return deferredVersionSwapServiceEnabled; + } + public long getTerminalStateTopicCheckerDelayMs() { return terminalStateTopicCheckerDelayMs; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index 0de71246bae..ed51095c291 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -223,6 +223,14 @@ public long getBackupVersionCleanupSleepMs() { return getCommonConfig().getBackupVersionCleanupSleepMs(); } + public long getDeferredVersionSwapSleepMs() { + return getCommonConfig().getDeferredVersionSwapSleepMs(); + } + + public boolean isDeferredVersionSwapServiceEnabled() { + return getCommonConfig().isDeferredVersionSwapServiceEnabled(); + } + public boolean isControllerEnforceSSLOnly() { return getCommonConfig().isControllerEnforceSSLOnly(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 8adccb1e83b..5e0a19fc557 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3672,7 +3672,7 @@ private OfflinePushStatusInfo getOffLineJobStatus( Store parentStore = repository.getStore(storeName); Version version = parentStore.getVersion(versionNum); boolean isDeferredSwap = version != null && version.isVersionSwapDeferred(); - if (!isDeferredSwap) { + if (!isDeferredSwap || !StringUtils.isEmpty(targetedRegions)) { // targetedRegions is non-empty for target region push of batch store boolean isTargetRegionPush = !StringUtils.isEmpty(targetedRegions); Version storeVersion = parentStore.getVersion(versionNum); @@ -3696,12 +3696,15 @@ private OfflinePushStatusInfo getOffLineJobStatus( } // status PUSHED is set when batch store's target region push is completed, but other region are yet to // complete - if (isTargetRegionPush && !isVersionPushed) { - parentStore.updateVersionStatus(versionNum, PUSHED); - repository.updateStore(parentStore); - } else { // status ONLINE is set when all region finishes ingestion for either regular or target region push. - parentStore.updateVersionStatus(versionNum, ONLINE); - repository.updateStore(parentStore); + if (currentReturnStatus.equals(ExecutionStatus.COMPLETED)) { + if (isTargetRegionPush && !isVersionPushed) { + parentStore.updateVersionStatus(versionNum, PUSHED); + repository.updateStore(parentStore); + } else { // status ONLINE is set when all region finishes ingestion for either regular or target region + // push. + parentStore.updateVersionStatus(versionNum, ONLINE); + repository.updateStore(parentStore); + } } } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DeferredVersionSwapStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DeferredVersionSwapStats.java new file mode 100644 index 00000000000..aaf7db3df21 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DeferredVersionSwapStats.java @@ -0,0 +1,28 @@ +package com.linkedin.venice.controller.stats; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.Count; + + +public class DeferredVersionSwapStats extends AbstractVeniceStats { + private final Sensor deferredVersionSwapErrorSensor; + private final Sensor deferredVersionSwapThrowableSensor; + private final static String DEFERRED_VERSION_SWAP_ERROR = "deferred_version_swap_error"; + private final static String DEFERRED_VERSION_SWAP_THROWABLE = "deferred_version_swap_throwable"; + + public DeferredVersionSwapStats(MetricsRepository metricsRepository) { + super(metricsRepository, "DeferredVersionSwap"); + deferredVersionSwapErrorSensor = registerSensorIfAbsent(DEFERRED_VERSION_SWAP_ERROR, new Count()); + deferredVersionSwapThrowableSensor = registerSensorIfAbsent(DEFERRED_VERSION_SWAP_THROWABLE, new Count()); + } + + public void recordDeferredVersionSwapErrorSensor() { + deferredVersionSwapErrorSensor.record(); + } + + public void recordDeferreredVersionSwapThrowableSensor() { + deferredVersionSwapThrowableSensor.record(); + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java new file mode 100644 index 00000000000..c1720bef6da --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java @@ -0,0 +1,437 @@ +package com.linkedin.venice.controller; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.linkedin.venice.controller.stats.DeferredVersionSwapStats; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.TestUtils; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestDeferredVersionSwapService { + private VeniceParentHelixAdmin admin; + private VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig; + private static final String clusterName = "testCluster"; + + private static final String region1 = "test1"; + private static final String region2 = "test2"; + private static final String region3 = "test3"; + + @BeforeMethod + public void setUp() { + admin = mock(VeniceParentHelixAdmin.class); + veniceControllerMultiClusterConfig = mock(VeniceControllerMultiClusterConfig.class); + + Set clusters = new HashSet<>(); + clusters.add(clusterName); + doReturn(clusters).when(veniceControllerMultiClusterConfig).getClusters(); + doReturn(10L).when(veniceControllerMultiClusterConfig).getDeferredVersionSwapSleepMs(); + doReturn(true).when(veniceControllerMultiClusterConfig).isDeferredVersionSwapServiceEnabled(); + } + + private Store mockStore( + int currentVersion, + int waitTime, + String targetRegions, + Map versions, + String name) { + Store store = mock(Store.class); + doReturn(waitTime).when(store).getTargetSwapRegionWaitTime(); + doReturn(targetRegions).when(store).getTargetSwapRegion(); + doReturn(currentVersion).when(store).getCurrentVersion(); + + List versionList = new ArrayList<>(); + versions.forEach((n, s) -> { + Version v = mock(Version.class); + doReturn(n).when(v).getNumber(); + doReturn(s).when(v).getStatus(); + doReturn(v).when(store).getVersion(n); + versionList.add(v); + }); + doReturn(versionList).when(store).getVersions(); + doReturn(name).when(store).getName(); + + return store; + } + + private Admin.OfflinePushStatusInfo getOfflinePushStatusInfo( + String region1Status, + String region2Status, + String region3Status, + Long region1Timestamp, + Long region2Timestamp, + Long region3Timestamp) { + Admin.OfflinePushStatusInfo pushStatusInfo = + getOfflinePushStatusInfo(region1Status, region2Status, region1Timestamp, region2Timestamp); + pushStatusInfo.getExtraInfoUpdateTimestamp().put(region3, region3Timestamp); + pushStatusInfo.getExtraInfo().put(region3, region3Status); + return pushStatusInfo; + } + + private Admin.OfflinePushStatusInfo getOfflinePushStatusInfo( + String region1Status, + String region2Status, + Long region1Timestamp, + Long region2Timestamp) { + Map extraInfo = new HashMap<>(); + extraInfo.put(region1, region1Status); + extraInfo.put(region2, region2Status); + + Map extraInfoUpdateTimestamp = new HashMap<>(); + extraInfoUpdateTimestamp.put(region1, region1Timestamp); + extraInfoUpdateTimestamp.put(region2, region2Timestamp); + + return new Admin.OfflinePushStatusInfo( + ExecutionStatus.COMPLETED, + 123L, + extraInfo, + null, + null, + extraInfoUpdateTimestamp); + } + + @Test + public void testDeferredVersionSwap() throws Exception { + Map versions = new HashMap<>(); + int targetVersionNum = 3; + int davinciVersionNum = 2; + int completedVersionNum = 1; + versions.put(completedVersionNum, VersionStatus.ONLINE); + versions.put(davinciVersionNum, VersionStatus.PUSHED); + versions.put(targetVersionNum, VersionStatus.PUSHED); + String storeName1 = "testStore"; + String storeName2 = "testStore2"; + String storeName3 = "testStore3"; + String storeName4 = "testStore4"; + String storeName5 = "testStore5"; + String storeName6 = "testStore5"; + Store store1 = mockStore(davinciVersionNum, 60, region1, versions, storeName1); + Store store2 = mockStore(completedVersionNum, 60, region1, versions, storeName2); + Store store3 = mockStore(davinciVersionNum, 60, region1, versions, storeName3); + Store store4 = mockStore(davinciVersionNum, 60, region1, versions, storeName4); + Store store5 = mockStore(completedVersionNum, 60, region1, versions, storeName5); + Store store6 = mockStore(davinciVersionNum, 60, region1, versions, storeName6); + + Long time = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC); + List storeList = new ArrayList<>(); + storeList.add(store1); + storeList.add(store2); + storeList.add(store3); + storeList.add(store4); + storeList.add(store5); + storeList.add(store6); + doReturn(storeList).when(admin).getAllStores(clusterName); + doReturn(3).when(store1).getLargestUsedVersionNumber(); + doReturn(2).when(store2).getLargestUsedVersionNumber(); + doReturn(3).when(store3).getLargestUsedVersionNumber(); + doReturn(3).when(store4).getLargestUsedVersionNumber(); + doReturn(1).when(store5).getLargestUsedVersionNumber(); + doReturn(3).when(store5).getLargestUsedVersionNumber(); + + Version davinciVersion = new VersionImpl(storeName2, davinciVersionNum); + Version targetVersion = new VersionImpl(storeName1, targetVersionNum); + davinciVersion.setIsDavinciHeartbeatReported(true); + + ControllerClient controllerClient = mock(ControllerClient.class); + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + Map controllerClientMap = new HashMap<>(); + controllerClientMap.put(region1, controllerClient); + StoreResponse storeResponse = new StoreResponse(); + StoreInfo storeInfo = new StoreInfo(); + List versionList = new ArrayList<>(); + versionList.add(targetVersion); + versionList.add(davinciVersion); + storeInfo.setVersions(versionList); + storeResponse.setStore(storeInfo); + + doReturn(storeResponse).when(controllerClient).getStore(any()); + doReturn(veniceHelixAdmin).when(admin).getVeniceHelixAdmin(); + + HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class); + ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class); + doReturn(repository).when(resources).getStoreMetadataRepository(); + doReturn(resources).when(veniceHelixAdmin).getHelixVeniceClusterResources(clusterName); + doReturn(store1).when(repository).getStore(storeName1); + doReturn(controllerClientMap).when(veniceHelixAdmin).getControllerClientMap(clusterName); + + Map coloToVersions = new HashMap<>(); + Map davinciColoToVersions = new HashMap<>(); + coloToVersions.put(region1, 3); + coloToVersions.put(region2, 2); + davinciColoToVersions.put(region1, 2); + davinciColoToVersions.put(region2, 1); + + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName1); + doReturn(davinciColoToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName2); + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName3); + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName4); + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName6); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithWaitTimeElapsed = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.COMPLETED.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30)); + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithoutWaitTimeElapsed = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.COMPLETED.toString(), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithOngoingPush = getOfflinePushStatusInfo( + ExecutionStatus.STARTED.toString(), + ExecutionStatus.COMPLETED.toString(), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithFailedPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.ERROR.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30)); + + String kafkaTopicName1 = Version.composeKafkaTopic(storeName1, targetVersionNum); + String kafkaTopicName2 = Version.composeKafkaTopic(storeName2, davinciVersionNum); + String kafkaTopicName3 = Version.composeKafkaTopic(storeName3, targetVersionNum); + String kafkaTopicName4 = Version.composeKafkaTopic(storeName4, targetVersionNum); + String kafkaTopicName6 = Version.composeKafkaTopic(storeName6, targetVersionNum); + doReturn(offlinePushStatusInfoWithWaitTimeElapsed).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName1); + doReturn(offlinePushStatusInfoWithWaitTimeElapsed).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName2); + doReturn(offlinePushStatusInfoWithoutWaitTimeElapsed).when(admin) + .getOffLinePushStatus(clusterName, kafkaTopicName3); + doReturn(offlinePushStatusInfoWithOngoingPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName4); + doReturn(offlinePushStatusInfoWithFailedPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName6); + doReturn(true).when(admin).isLeaderControllerFor(clusterName); + + DeferredVersionSwapService deferredVersionSwapService = + new DeferredVersionSwapService(admin, veniceControllerMultiClusterConfig, mock(DeferredVersionSwapStats.class)); + + deferredVersionSwapService.startInner(); + + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + // push completed in target region & wait time elapsed + verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName1, region2); + + // davinci store + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName2, region2); + + // push completed in target region & wait time NOT elapsed + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName3, region2); + + // push not completed in target region + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName4, region2); + + // push is complete in all regions + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName5, region2); + + // push failed in non target region + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName6, region2); + }); + } + + @Test + public void testDeferredVersionSwapNonTargetRegionStatuses() throws Exception { + Map versions = new HashMap<>(); + int targetVersionNum = 3; + int davinciVersionNum = 2; + int completedVersionNum = 1; + versions.put(completedVersionNum, VersionStatus.ONLINE); + versions.put(davinciVersionNum, VersionStatus.PUSHED); + versions.put(targetVersionNum, VersionStatus.PUSHED); + String storeName1 = "testStore"; + String storeName2 = "testStore2"; + String storeName3 = "testStore3"; + String storeName4 = "testStore4"; + String storeName5 = "testStore5"; + String storeName6 = "testStore6"; + Store store1 = mockStore(davinciVersionNum, 60, region1, versions, storeName1); + Store store2 = mockStore(davinciVersionNum, 60, region1, versions, storeName2); + Store store3 = mockStore(davinciVersionNum, 60, region1, versions, storeName3); + Store store4 = mockStore(davinciVersionNum, 60, region1, versions, storeName4); + Store store5 = mockStore(davinciVersionNum, 60, region1, versions, storeName5); + Store store6 = mockStore(davinciVersionNum, 60, region1 + "," + region2, versions, storeName6); + + Long time = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC); + List storeList = new ArrayList<>(); + storeList.add(store1); + storeList.add(store2); + storeList.add(store3); + storeList.add(store4); + storeList.add(store5); + storeList.add(store6); + doReturn(storeList).when(admin).getAllStores(clusterName); + doReturn(3).when(store1).getLargestUsedVersionNumber(); + doReturn(3).when(store2).getLargestUsedVersionNumber(); + doReturn(3).when(store3).getLargestUsedVersionNumber(); + doReturn(3).when(store4).getLargestUsedVersionNumber(); + doReturn(3).when(store5).getLargestUsedVersionNumber(); + doReturn(3).when(store6).getLargestUsedVersionNumber(); + + Version targetVersion = new VersionImpl(storeName1, targetVersionNum); + + ControllerClient controllerClient = mock(ControllerClient.class); + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + Map controllerClientMap = new HashMap<>(); + controllerClientMap.put(region1, controllerClient); + controllerClientMap.put(region2, controllerClient); + StoreResponse storeResponse = new StoreResponse(); + StoreInfo storeInfo = new StoreInfo(); + List versionList = new ArrayList<>(); + versionList.add(targetVersion); + storeInfo.setVersions(versionList); + storeResponse.setStore(storeInfo); + + doReturn(storeResponse).when(controllerClient).getStore(any()); + doReturn(veniceHelixAdmin).when(admin).getVeniceHelixAdmin(); + + HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class); + ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class); + doReturn(repository).when(resources).getStoreMetadataRepository(); + doReturn(resources).when(veniceHelixAdmin).getHelixVeniceClusterResources(clusterName); + doReturn(store1).when(repository).getStore(storeName1); + doReturn(controllerClientMap).when(veniceHelixAdmin).getControllerClientMap(clusterName); + + Map coloToVersions = new HashMap<>(); + coloToVersions.put(region1, 3); + coloToVersions.put(region2, 2); + coloToVersions.put(region3, 2); + + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(any(), any()); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithOneOngoingPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.STARTED.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithMultipleOngoingPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.STARTED.toString(), + ExecutionStatus.STARTED.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithOneFailedPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.ERROR.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithTwoFailedPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.ERROR.toString(), + ExecutionStatus.ERROR.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoWithOngoingFailedPush = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.STARTED.toString(), + ExecutionStatus.ERROR.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30), + time - TimeUnit.MINUTES.toSeconds(30)); + + Admin.OfflinePushStatusInfo offlinePushStatusInfoCompletedFailedTargetRegions = getOfflinePushStatusInfo( + ExecutionStatus.COMPLETED.toString(), + ExecutionStatus.ERROR.toString(), + ExecutionStatus.COMPLETED.toString(), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(90), + time - TimeUnit.MINUTES.toSeconds(30)); + + String kafkaTopicName1 = Version.composeKafkaTopic(storeName1, targetVersionNum); + String kafkaTopicName2 = Version.composeKafkaTopic(storeName2, targetVersionNum); + String kafkaTopicName3 = Version.composeKafkaTopic(storeName3, targetVersionNum); + String kafkaTopicName4 = Version.composeKafkaTopic(storeName4, targetVersionNum); + String kafkaTopicName5 = Version.composeKafkaTopic(storeName5, targetVersionNum); + String kafkaTopicName6 = Version.composeKafkaTopic(storeName6, targetVersionNum); + doReturn(offlinePushStatusInfoWithOneOngoingPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName1); + doReturn(offlinePushStatusInfoWithMultipleOngoingPush).when(admin) + .getOffLinePushStatus(clusterName, kafkaTopicName2); + doReturn(offlinePushStatusInfoWithOneFailedPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName3); + doReturn(offlinePushStatusInfoWithTwoFailedPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName4); + doReturn(offlinePushStatusInfoWithOngoingFailedPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName5); + doReturn(offlinePushStatusInfoCompletedFailedTargetRegions).when(admin) + .getOffLinePushStatus(clusterName, kafkaTopicName6); + doReturn(true).when(admin).isLeaderControllerFor(clusterName); + + DeferredVersionSwapService deferredVersionSwapService = + new DeferredVersionSwapService(admin, veniceControllerMultiClusterConfig, mock(DeferredVersionSwapStats.class)); + + deferredVersionSwapService.startInner(); + + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + // one target region, 2 non target regions: one succeeded, one still in progress -> do not swap + verify(admin, never()) + .rollForwardToFutureVersion(clusterName, storeName1, String.join(",\\s*", region2, region3)); + + // one target region, 2 non target regions: both still in progress -> do not swap + verify(admin, never()) + .rollForwardToFutureVersion(clusterName, storeName2, String.join(",\\s*", region2, region3)); + + // one target region, 2 non target regions: one succeeded, one failed -> swap + verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName3, region2); + verify(store3, atLeast(1)).updateVersionStatus(3, VersionStatus.ONLINE); + + // one target region, 2 non target regions: both failed -> do not swap + verify(store4, atLeast(1)).updateVersionStatus(3, VersionStatus.ERROR); + + // one target region, 2 non target regions: one failed, one in progress -> do not swap + verify(admin, never()) + .rollForwardToFutureVersion(clusterName, storeName5, String.join(",\\s*", region2, region3)); + + // two target regions: 1 completed, 1 failed, 1 completed non target region -> swap + verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName6, region3); + verify(store6, atLeast(1)).updateVersionStatus(3, VersionStatus.ONLINE); + }); + } + + @Test + public void testTargetRegionSwapNotEnabled() { + Map versions = new HashMap<>(); + versions.put(1, VersionStatus.ONLINE); + versions.put(2, VersionStatus.ONLINE); + String storeName = "testStore"; + Store store = mockStore(2, 60, null, versions, storeName); + + Set clusters = new HashSet<>(); + clusters.add(clusterName); + doReturn(clusters).when(veniceControllerMultiClusterConfig).getClusters(); + List storeList = new ArrayList<>(); + storeList.add(store); + doReturn(storeList).when(admin).getAllStores(storeName); + + TestUtils.waitForNonDeterministicAssertion( + 1, + TimeUnit.SECONDS, + () -> verify(admin, never()).rollForwardToFutureVersion(any(), any(), any())); + } +}