-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experiment/dataset update flow #292
Conversation
src/domain/dataset-update-flow/src/entities/update/update_trigger.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/schedule/schedule_type.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/update/update_event.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/update/update_event.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/update/update_delay_reason.rs
Outdated
Show resolved
Hide resolved
bbe8ed0
to
051713a
Compare
9d447cb
to
3338514
Compare
src/domain/dataset-update-flow/src/entities/schedule/schedule.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/schedule/update_schedule_event.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/schedule/update_schedule_state.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/schedule/update_schedule_state.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/entities/update/update_event.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/aggregates/update_schedule.rs
Outdated
Show resolved
Hide resolved
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
pub struct FlowStartConditionBatching { | ||
pub available_records: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping this field up-to-date will require a lot of writes to event store. Imagine upon every push to kafka by a device that sends data every second we had to update this state in the flow. Since this type is a "start condition", not a state of batching I suggest we remove this field. And if we really want to show the state of a batch - we'll add a separate API to query it from Kafka or elsewhere when needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't used yet. I removed, but we will have to have some source of truth in this regard.
Following the example with the IoT device, we still have to create an every-second secondary trigger event and must show each in the History view.
Currently this trigger is an empty struct:
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowTriggerPush {
// TODO: source (HTTP, MQTT, CMD, ...)
}
similarly, we don't store yet the number of records added for the flow outcome, and can't use it yet for the derived dataset throttling:
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowTriggerInputDatasetFlow {
pub input_dataset_id: DatasetID,
pub input_flow_type: DatasetFlowType,
pub input_flow_id: FlowID,
}
Perhaps both these structures should have information about the offset change. Sum of these numbers might be the criteria we are looking for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I was imaging it differently:
- Kafka controller will know about batching configuration
- When Kafka queue becomes non-empty - it will trigger the flow (to show users that batch is pending)
- While more records are flowing - it will NOT touch the flow system, until the configured batch size is reached to finally trigger the flow
This way secondary trigger events will appear only for other types of trigger, e.g. "manual".
While the flow is already in the state of waiting for a batch - I don't think showing more data arriving as secondary triggers is useful, and can result in too many updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this suggestion, without the visibility of Kafka's queue state, let's say, showing something like accumulated 17,245/25,000 records
, the user will not understand what is the flow waiting for.
src/domain/dataset-update-flow/src/entities/flow/flow_trigger.rs
Outdated
Show resolved
Hide resolved
|
||
#[derive(Debug, Clone, PartialEq, Eq)] | ||
pub struct StartConditionConfiguration { | ||
pub throttling_period: Option<Duration>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this throttling config related to:
- "don't update free datasets more often than 30 min"
- or "after first push creates a flow - keep batching data for 10 more minutes before running ingest task"?
I feel like the former should be a system-wide setting, not something we store per flow.
If its latter - we should have a different name for it, e.g. "batching period".
src/domain/dataset-update-flow/src/entities/shared/system_flow_type.rs
Outdated
Show resolved
Hide resolved
src/domain/dataset-update-flow/src/services/flow/flow_service.rs
Outdated
Show resolved
Hide resolved
src/infra/dataset-update-flow-inmem/src/services/dependency_graph_service_inmem.rs
Outdated
Show resolved
Hide resolved
async fn schedule_flow_task(&self, flow: &mut Flow) -> Result<(), InternalError> { | ||
let logical_plan = match &flow.flow_key { | ||
FlowKey::Dataset(flow_key) => match flow_key.flow_type { | ||
DatasetFlowType::Update => LogicalPlan::UpdateDataset(UpdateDataset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have something like Probe
for the flow system too, so we could test it manually without spawning expensive tasks?
Perhaps a Probe
flow could run Probe
tasks on a certain dataset and spawn Probe
flows for all downstream, like a cascading update but the actual work will be a no-op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would likely introduce something like this when testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no mechanism to pass any parameters to the flow yet. This would be necessary for Probe
idea to make sense.
src/infra/dataset-update-flow-inmem/src/services/flow/flow_service_inmem.rs
Outdated
Show resolved
Hide resolved
src/infra/dataset-update-flow-inmem/src/services/flow/flow_service_inmem.rs
Outdated
Show resolved
Hide resolved
56cfbd6
to
91fda61
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor suggestions, but overall I'm really happy with the state of this PR 👍
src/domain/flow-system/src/entities/shared/dataset_flow_type.rs
Outdated
Show resolved
Hide resolved
src/domain/flow-system/src/entities/shared/dataset_flow_type.rs
Outdated
Show resolved
Hide resolved
089747d
to
5328f84
Compare
DatasetUpdateFlow => UpdateSchedule. Added Update aggregate, representing a single instance of update process for a given dataset. Formatter fix Review: renamings Review: externalized time source for event-sourcing aggregates Review: compacted task events in update flow Review: added update cancellation (before tasks scheduled) Review: accepted suggestions to name delay reasons as start conditions, and secondary triggers as simply adding triggers Merge corrections In-memory implementation of repositories Drafted update scheduler service. ES: support optional aggregate loads. Sketched `UpdateService` without tasks scheduling yet Scheduler steps: - schedule update task on manual trigger - read all active auto-schedules at the beginning of the run process - react on schedule change events: update table of active schedules Separate set in each in-memory event repository: quick return of query objects Drafted Time Wheel concept Connected time wheel and update service: initial scheduling and run loop Drafted in-memory dependency graph service (based on petgraph library). Scheduling downstream datasets when dataset update completes, respecting throttling period logic Enqueue next auto-polling root dataset update when current update succeeds Prototyped EventBus + added 1st demo link for schedule modification event Minor event structure fixes Simplified UpdateSchedule events Connected task finish and dataset removal events. Large DI changes in existing tests to support EventBus dependency Concurrent execution of event handlers in dispatcher Converted event bus handlers to traits. Registering handlers in the catalog. Merge corrections Review: renamed DatasetDeleted event Review: avoid excessive events cloning Shifted down 'get_queries' to update schedule's event store only Async event handler combiner now collects all handlers results, before reducing the error for reporting Resolved basic code review notes Added `get_last_update` by dataset operation Formatter fixed dill 0.8 - replaced `builder_for` on `Component::builder()` Integrated `dill::interface` feature and removed many explicit binds Review: renamed task event classes, enum for dataset events Review: reworked relevance of update schedule, using statuses instead (active, paused, stopped) Review: allow update schedules to be re-added after dataset reincarnation with the same ID Review: a few TODOs on performance improvements Review: reimplemented TimeWheel using binary heap Review: removed `pause` and `resume` methods in `UpdateSchedule` aggregate, use `set_schedule` only Renamed update schedules => update configurations Separated schedules and start conditions in update configurations Generalized dataset flow configurations System vs Dataset flow configurations Not very smart, but a model of System and Dataset flows. Scheduling service largely not implemented yet. Generic-based flow events, state Refactored flow configurations aggregate to generic events/state similarly Code reuse approach for flow/flow-config aggregates based on trait extensions Attempts to generalize flow configuration services (at least traits). Folder reorganization in interface and in-mem crate. Implemented generic in-memory event stores and integrated them into all current aggregates Implemented SystemFlow in-memory repository Implemented in-memory Flow service for all kinds of flows Decomposing Flow service: extracted ActiveConfigsState Decomposing Flow service: extracted PendingFlowsState Compacted DatasetFlow & SystemFlow into Flow Similarly compacted FlowConfiguration aggregate Simplifications in FlowService Review: 'flow-system' and 'flow-system-inmem' are final names Review: 'flow-system' and 'flow-system-inmem' are final names Review: improved enum all-value iteration methods in flow types Review: specific => of_type Review: removed duplicate OwnedDatasetFlowKey Review: killed redundand feature flags Review: tracing without formatting Moved `DependencyGraphService` to core domain Review: removed reundand field Experiment/dependencies graph (#364) * Simplest startup job to initialize dependencies graph. * Removed dependency query from DatasetRepository. * Integrated dependencies graph into GraphQL queries for upstream/downstream links. * Integrated dependencies graph into dataset deletion. * Reacting on `DatasetCreated` events. * Implemented reaction of dependencies graph on changes in dataset inputs * Implemented lazy vs eager dependencies initialization Merge corrections Test fix Review: minor renamings
41a924e
to
08ea032
Compare
Flow itself needs an Aborted outcome.
Explicit model of flow abortion. More determenistic time propagation of flow config events.
… update on success. Removed retry on failure logic in flow aggregate.
No description provided.