Skip to content

Commit

Permalink
Minimizing fow tests randomness with fixed planned start time and flo…
Browse files Browse the repository at this point in the history
…w trigger time
  • Loading branch information
zaychenko-sergei committed Dec 22, 2023
1 parent 0e1a94e commit 08ea032
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 15 deletions.
7 changes: 6 additions & 1 deletion src/app/cli/src/explore/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use dill::Catalog;
use internal_error::*;
use kamu::domain::SystemTimeSource;
use kamu_flow_system_inmem::domain::FlowService;
use kamu_task_system_inmem::domain::TaskExecutor;

Expand All @@ -24,6 +25,7 @@ pub struct APIServer {
>,
task_executor: Arc<dyn TaskExecutor>,
flow_service: Arc<dyn FlowService>,
time_source: Arc<dyn SystemTimeSource>,
}

impl APIServer {
Expand All @@ -39,6 +41,8 @@ impl APIServer {

let flow_service = base_catalog.get_one().unwrap();

let time_source = base_catalog.get_one().unwrap();

let gql_schema = kamu_adapter_graphql::schema();

let app = axum::Router::new()
Expand Down Expand Up @@ -89,6 +93,7 @@ impl APIServer {
server,
task_executor,
flow_service,
time_source,
}
}

Expand All @@ -100,7 +105,7 @@ impl APIServer {
tokio::select! {
res = self.server => { res.int_err() },
res = self.task_executor.run() => { res.int_err() },
res = self.flow_service.run() => { res.int_err() }
res = self.flow_service.run(self.time_source.now()) => { res.int_err() }
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/domain/flow-system/src/services/flow/flow_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use chrono::{DateTime, Utc};
use event_sourcing::LoadError;
use internal_error::{ErrorIntoInternal, InternalError};
use kamu_core::DatasetNotFoundError;
Expand All @@ -20,11 +21,12 @@ use crate::{DatasetFlowType, FlowID, FlowKey, FlowState, SystemFlowType};
#[async_trait::async_trait]
pub trait FlowService: Sync + Send {
/// Runs the update main loop
async fn run(&self) -> Result<(), InternalError>;
async fn run(&self, planned_start_time: DateTime<Utc>) -> Result<(), InternalError>;

/// Triggers the specified flow manually, unless it's already waiting
async fn trigger_manual_flow(
&self,
trigger_time: DateTime<Utc>,
flow_key: FlowKey,
initiator_account_id: AccountID,
initiator_account_name: AccountName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,9 @@ impl FlowServiceInMemory {
impl FlowService for FlowServiceInMemory {
/// Runs the update main loop
#[tracing::instrument(level = "info", skip_all)]
async fn run(&self) -> Result<(), InternalError> {
async fn run(&self, planned_start_time: DateTime<Utc>) -> Result<(), InternalError> {
// Initial scheduling
let start_time = self.round_time(self.time_source.now())?;
let start_time = self.round_time(planned_start_time)?;
self.initialize_auto_polling_flows_from_configurations(start_time)
.await?;

Expand Down Expand Up @@ -468,6 +468,7 @@ impl FlowService for FlowServiceInMemory {
)]
async fn trigger_manual_flow(
&self,
trigger_time: DateTime<Utc>,
flow_key: FlowKey,
initiator_account_id: AccountID,
initiator_account_name: AccountName,
Expand All @@ -487,7 +488,7 @@ impl FlowService for FlowServiceInMemory {
// Otherwise, initiate a new flow and activate it at the nearest scheduler slot
None => {
let mut flow = self.make_new_flow(flow_key, trigger).await?;
let activation_time = self.round_time(self.time_source.now())?;
let activation_time = self.round_time(trigger_time)?;
self.enqueue_flow(flow.flow_id, activation_time)?;

flow.activate_at_time(self.time_source.now(), activation_time)
Expand Down
42 changes: 32 additions & 10 deletions src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Duration, DurationRound, Utc};
use dill::*;
use event_bus::{AsyncEventHandler, EventBus};
use kamu::testing::MetadataFactory;
Expand Down Expand Up @@ -47,8 +47,14 @@ async fn test_read_initial_config_and_queue_properly() {
)
.await;

// Remember start time
let start_time = Utc::now()
.duration_round(Duration::milliseconds(SCHEDULING_ALIGNMENT_MS))
.unwrap();

// Run scheduler concurrently with manual triggers script
let _ = tokio::select! {
res = harness.flow_service.run() => res.int_err(),
res = harness.flow_service.run(start_time) => res.int_err(),
_ = tokio::time::sleep(std::time::Duration::from_millis(60)) => Ok(()),
}
.unwrap();
Expand All @@ -65,7 +71,7 @@ async fn test_read_initial_config_and_queue_properly() {
let foo_moment = state.snapshots[1].0;
let bar_moment = state.snapshots[2].0;

assert!(start_moment < foo_moment && foo_moment < bar_moment);
assert_eq!(start_time, start_moment);
assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // planned time for "foo"
assert_eq!((bar_moment - start_moment), Duration::milliseconds(45)); // planned time for "bar"

Expand Down Expand Up @@ -151,18 +157,26 @@ async fn test_manual_trigger() {
let foo_flow_key: FlowKey = FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::Ingest).into();
let bar_flow_key: FlowKey = FlowKeyDataset::new(bar_id.clone(), DatasetFlowType::Ingest).into();

// Remember start time
let start_time = Utc::now()
.duration_round(Duration::milliseconds(SCHEDULING_ALIGNMENT_MS))
.unwrap();

// Run scheduler concurrently with manual triggers script
let _ = tokio::select! {
res = harness.flow_service.run() => res.int_err(),
res = harness.flow_service.run(start_time) => res.int_err(),
_ = async {
// Sleep < "foo" period
tokio::time::sleep(std::time::Duration::from_millis(18)).await;
harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already
harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" not queued, starts soon
let new_time = start_time + Duration::milliseconds(18);
harness.trigger_manual_flow(new_time, foo_flow_key.clone()).await; // "foo" pending already
harness.trigger_manual_flow(new_time, bar_flow_key.clone()).await; // "bar" not queued, starts soon

// Wake up after foo scheduling
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already, even running
harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" pending already, event running
let new_time = new_time + Duration::milliseconds(20);
harness.trigger_manual_flow(new_time, foo_flow_key.clone()).await; // "foo" pending already, even running
harness.trigger_manual_flow(new_time, bar_flow_key.clone()).await; // "bar" pending already, event running

// Make sure nothing got scheduled in near time
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Expand All @@ -180,6 +194,7 @@ async fn test_manual_trigger() {
let bar_moment = state.snapshots[1].0;
let foo_moment = state.snapshots[2].0;

assert_eq!(start_moment, start_time);
assert_eq!((bar_moment - start_moment), Duration::milliseconds(20)); // next slot after 18ms trigger with 5ms align
assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // 30ms as planned

Expand Down Expand Up @@ -330,6 +345,10 @@ impl AsyncEventHandler<FlowServiceEventExecutedTimeSlot> for TestFlowSystemListe

/////////////////////////////////////////////////////////////////////////////////////////

const SCHEDULING_ALIGNMENT_MS: i64 = 5;

/////////////////////////////////////////////////////////////////////////////////////////

struct FlowHarness {
_tmp_dir: tempfile::TempDir,
catalog: dill::Catalog,
Expand All @@ -347,7 +366,9 @@ impl FlowHarness {

let catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add_value(FlowServiceRunConfig::new(Duration::milliseconds(5)))
.add_value(FlowServiceRunConfig::new(Duration::milliseconds(
SCHEDULING_ALIGNMENT_MS,
)))
.add::<FlowServiceInMemory>()
.add::<FlowEventStoreInMem>()
.add::<FlowConfigurationServiceInMemory>()
Expand Down Expand Up @@ -423,9 +444,10 @@ impl FlowHarness {
task_ids
}

async fn trigger_manual_flow(&self, flow_key: FlowKey) {
async fn trigger_manual_flow(&self, trigger_time: DateTime<Utc>, flow_key: FlowKey) {
self.flow_service
.trigger_manual_flow(
trigger_time,
flow_key,
FAKE_ACCOUNT_ID.to_string(),
AccountName::new_unchecked(auth::DEFAULT_ACCOUNT_NAME),
Expand Down

0 comments on commit 08ea032

Please sign in to comment.