From 08ea0326b5fa22268d40cff59473a49a93daf32a Mon Sep 17 00:00:00 2001 From: Sergei Zaychenko Date: Fri, 22 Dec 2023 02:11:09 -0800 Subject: [PATCH] Minimizing fow tests randomness with fixed planned start time and flow trigger time --- src/app/cli/src/explore/api_server.rs | 7 +++- .../src/services/flow/flow_service.rs | 4 +- .../src/services/flow/flow_service_inmem.rs | 7 ++-- .../tests/tests/test_flow_service_inmem.rs | 42 ++++++++++++++----- 4 files changed, 45 insertions(+), 15 deletions(-) diff --git a/src/app/cli/src/explore/api_server.rs b/src/app/cli/src/explore/api_server.rs index de09722c4e..bc78e9a99f 100644 --- a/src/app/cli/src/explore/api_server.rs +++ b/src/app/cli/src/explore/api_server.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use dill::Catalog; use internal_error::*; +use kamu::domain::SystemTimeSource; use kamu_flow_system_inmem::domain::FlowService; use kamu_task_system_inmem::domain::TaskExecutor; @@ -24,6 +25,7 @@ pub struct APIServer { >, task_executor: Arc, flow_service: Arc, + time_source: Arc, } impl APIServer { @@ -39,6 +41,8 @@ impl APIServer { let flow_service = base_catalog.get_one().unwrap(); + let time_source = base_catalog.get_one().unwrap(); + let gql_schema = kamu_adapter_graphql::schema(); let app = axum::Router::new() @@ -89,6 +93,7 @@ impl APIServer { server, task_executor, flow_service, + time_source, } } @@ -100,7 +105,7 @@ impl APIServer { tokio::select! { res = self.server => { res.int_err() }, res = self.task_executor.run() => { res.int_err() }, - res = self.flow_service.run() => { res.int_err() } + res = self.flow_service.run(self.time_source.now()) => { res.int_err() } } } } diff --git a/src/domain/flow-system/src/services/flow/flow_service.rs b/src/domain/flow-system/src/services/flow/flow_service.rs index 32b583d640..08eda99f64 100644 --- a/src/domain/flow-system/src/services/flow/flow_service.rs +++ b/src/domain/flow-system/src/services/flow/flow_service.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use chrono::{DateTime, Utc}; use event_sourcing::LoadError; use internal_error::{ErrorIntoInternal, InternalError}; use kamu_core::DatasetNotFoundError; @@ -20,11 +21,12 @@ use crate::{DatasetFlowType, FlowID, FlowKey, FlowState, SystemFlowType}; #[async_trait::async_trait] pub trait FlowService: Sync + Send { /// Runs the update main loop - async fn run(&self) -> Result<(), InternalError>; + async fn run(&self, planned_start_time: DateTime) -> Result<(), InternalError>; /// Triggers the specified flow manually, unless it's already waiting async fn trigger_manual_flow( &self, + trigger_time: DateTime, flow_key: FlowKey, initiator_account_id: AccountID, initiator_account_name: AccountName, diff --git a/src/infra/flow-system-inmem/src/services/flow/flow_service_inmem.rs b/src/infra/flow-system-inmem/src/services/flow/flow_service_inmem.rs index 130f36e618..1ca82daf9e 100644 --- a/src/infra/flow-system-inmem/src/services/flow/flow_service_inmem.rs +++ b/src/infra/flow-system-inmem/src/services/flow/flow_service_inmem.rs @@ -412,9 +412,9 @@ impl FlowServiceInMemory { impl FlowService for FlowServiceInMemory { /// Runs the update main loop #[tracing::instrument(level = "info", skip_all)] - async fn run(&self) -> Result<(), InternalError> { + async fn run(&self, planned_start_time: DateTime) -> Result<(), InternalError> { // Initial scheduling - let start_time = self.round_time(self.time_source.now())?; + let start_time = self.round_time(planned_start_time)?; self.initialize_auto_polling_flows_from_configurations(start_time) .await?; @@ -468,6 +468,7 @@ impl FlowService for FlowServiceInMemory { )] async fn trigger_manual_flow( &self, + trigger_time: DateTime, flow_key: FlowKey, initiator_account_id: AccountID, initiator_account_name: AccountName, @@ -487,7 +488,7 @@ impl FlowService for FlowServiceInMemory { // Otherwise, initiate a new flow and activate it at the nearest scheduler slot None => { let mut flow = self.make_new_flow(flow_key, trigger).await?; - let activation_time = self.round_time(self.time_source.now())?; + let activation_time = self.round_time(trigger_time)?; self.enqueue_flow(flow.flow_id, activation_time)?; flow.activate_at_time(self.time_source.now(), activation_time) diff --git a/src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs b/src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs index 497d5d91e5..7b382b7ecb 100644 --- a/src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs +++ b/src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs @@ -11,7 +11,7 @@ use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Duration, DurationRound, Utc}; use dill::*; use event_bus::{AsyncEventHandler, EventBus}; use kamu::testing::MetadataFactory; @@ -47,8 +47,14 @@ async fn test_read_initial_config_and_queue_properly() { ) .await; + // Remember start time + let start_time = Utc::now() + .duration_round(Duration::milliseconds(SCHEDULING_ALIGNMENT_MS)) + .unwrap(); + + // Run scheduler concurrently with manual triggers script let _ = tokio::select! { - res = harness.flow_service.run() => res.int_err(), + res = harness.flow_service.run(start_time) => res.int_err(), _ = tokio::time::sleep(std::time::Duration::from_millis(60)) => Ok(()), } .unwrap(); @@ -65,7 +71,7 @@ async fn test_read_initial_config_and_queue_properly() { let foo_moment = state.snapshots[1].0; let bar_moment = state.snapshots[2].0; - assert!(start_moment < foo_moment && foo_moment < bar_moment); + assert_eq!(start_time, start_moment); assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // planned time for "foo" assert_eq!((bar_moment - start_moment), Duration::milliseconds(45)); // planned time for "bar" @@ -151,18 +157,26 @@ async fn test_manual_trigger() { let foo_flow_key: FlowKey = FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::Ingest).into(); let bar_flow_key: FlowKey = FlowKeyDataset::new(bar_id.clone(), DatasetFlowType::Ingest).into(); + // Remember start time + let start_time = Utc::now() + .duration_round(Duration::milliseconds(SCHEDULING_ALIGNMENT_MS)) + .unwrap(); + + // Run scheduler concurrently with manual triggers script let _ = tokio::select! { - res = harness.flow_service.run() => res.int_err(), + res = harness.flow_service.run(start_time) => res.int_err(), _ = async { // Sleep < "foo" period tokio::time::sleep(std::time::Duration::from_millis(18)).await; - harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already - harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" not queued, starts soon + let new_time = start_time + Duration::milliseconds(18); + harness.trigger_manual_flow(new_time, foo_flow_key.clone()).await; // "foo" pending already + harness.trigger_manual_flow(new_time, bar_flow_key.clone()).await; // "bar" not queued, starts soon // Wake up after foo scheduling tokio::time::sleep(std::time::Duration::from_millis(20)).await; - harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already, even running - harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" pending already, event running + let new_time = new_time + Duration::milliseconds(20); + harness.trigger_manual_flow(new_time, foo_flow_key.clone()).await; // "foo" pending already, even running + harness.trigger_manual_flow(new_time, bar_flow_key.clone()).await; // "bar" pending already, event running // Make sure nothing got scheduled in near time tokio::time::sleep(std::time::Duration::from_millis(10)).await; @@ -180,6 +194,7 @@ async fn test_manual_trigger() { let bar_moment = state.snapshots[1].0; let foo_moment = state.snapshots[2].0; + assert_eq!(start_moment, start_time); assert_eq!((bar_moment - start_moment), Duration::milliseconds(20)); // next slot after 18ms trigger with 5ms align assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // 30ms as planned @@ -330,6 +345,10 @@ impl AsyncEventHandler for TestFlowSystemListe ///////////////////////////////////////////////////////////////////////////////////////// +const SCHEDULING_ALIGNMENT_MS: i64 = 5; + +///////////////////////////////////////////////////////////////////////////////////////// + struct FlowHarness { _tmp_dir: tempfile::TempDir, catalog: dill::Catalog, @@ -347,7 +366,9 @@ impl FlowHarness { let catalog = dill::CatalogBuilder::new() .add::() - .add_value(FlowServiceRunConfig::new(Duration::milliseconds(5))) + .add_value(FlowServiceRunConfig::new(Duration::milliseconds( + SCHEDULING_ALIGNMENT_MS, + ))) .add::() .add::() .add::() @@ -423,9 +444,10 @@ impl FlowHarness { task_ids } - async fn trigger_manual_flow(&self, flow_key: FlowKey) { + async fn trigger_manual_flow(&self, trigger_time: DateTime, flow_key: FlowKey) { self.flow_service .trigger_manual_flow( + trigger_time, flow_key, FAKE_ACCOUNT_ID.to_string(), AccountName::new_unchecked(auth::DEFAULT_ACCOUNT_NAME),