Skip to content

Commit

Permalink
Experiment/dependencies graph (#364)
Browse files Browse the repository at this point in the history
* Simplest startup job to initialize dependencies graph.
* Removed dependency query from DatasetRepository.
* Integrated dependencies graph into GraphQL queries for upstream/downstream links.
* Integrated dependencies graph into dataset deletion.
* Reacting on `DatasetCreated` events.
* Implemented reaction of dependencies graph on changes in dataset inputs
* Implemented lazy vs eager dependencies initialization
  • Loading branch information
zaychenko-sergei committed Dec 14, 2023
1 parent 08d536f commit 32ab9e9
Show file tree
Hide file tree
Showing 54 changed files with 1,199 additions and 561 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::Arc;
use dill::Component;
use event_bus::EventBus;
use kamu::testing::MetadataFactory;
use kamu::DatasetRepositoryLocalFs;
use kamu::{DatasetRepositoryLocalFs, DependencyGraphServiceInMemory};
use kamu_adapter_auth_oso::{KamuAuthOso, OsoDatasetAuthorizer};
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DatasetActionUnauthorizedError};
use kamu_core::{AccessError, CurrentAccountSubject, DatasetRepository};
Expand Down Expand Up @@ -111,6 +111,7 @@ impl DatasetAuthorizerHarness {
)))
.add::<KamuAuthOso>()
.add::<OsoDatasetAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(datasets_dir)
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ kamu-data-utils = { workspace = true }
kamu-core = { workspace = true }
kamu-task-system = { workspace = true }
kamu-flow-system = { workspace = true }
event-bus = { workspace = true }

async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] }
async-trait = { version = "0.1", default-features = false }
Expand All @@ -36,6 +37,7 @@ indoc = "2"
serde = "1"
serde_json = "1"
tokio = { version = "1", default-features = false, features = [] }
tokio-stream = { version = "0.1", default-features = false }
tracing = "0.1"
thiserror = { version = "1", default-features = false }

Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#![feature(error_generic_member_access)]
#![feature(error_in_core)]
#![feature(int_roundings)]
#![feature(async_closure)]

pub mod extensions;
pub(crate) mod mutations;
Expand Down
61 changes: 40 additions & 21 deletions src/adapter/graphql/src/queries/datasets/dataset_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// by the Apache License, Version 2.0.

use chrono::prelude::*;
use futures::TryStreamExt;
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
use opendatafabric as odf;
use opendatafabric::{AsTypedBlock, VariantOf};
Expand Down Expand Up @@ -93,39 +92,59 @@ impl DatasetMetadata {

/// Current upstream dependencies of a dataset
async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dependency_graph_service =
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();

let dataset = self.get_dataset(ctx).await?;
let summary = dataset
.get_summary(domain::GetSummaryOpts::default())
use tokio_stream::StreamExt;
let upstream_dataset_ids: Vec<_> = dependency_graph_service
.get_upstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?;
.int_err()?
.collect()
.await;

let mut dependencies: Vec<_> = Vec::new();
for input in summary.dependencies.into_iter() {
let dataset_id = input.id.unwrap().clone();
let dataset_handle = dataset_repo
.resolve_dataset_ref(&dataset_id.as_local_ref())
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let mut upstream = Vec::with_capacity(upstream_dataset_ids.len());
for upstream_dataset_id in upstream_dataset_ids {
let hdl = dataset_repo
.resolve_dataset_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()?;
dependencies.push(Dataset::new(
Account::from_dataset_alias(ctx, &dataset_handle.alias),
dataset_handle,
upstream.push(Dataset::new(
Account::from_dataset_alias(ctx, &hdl.alias),
hdl,
));
}
Ok(dependencies)

Ok(upstream)
}

// TODO: Convert to collection
/// Current downstream dependencies of a dataset
async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dependency_graph_service =
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();

let downstream: Vec<_> = dataset_repo
.get_downstream_dependencies(&self.dataset_handle.as_local_ref())
.map_ok(|hdl| Dataset::new(Account::from_dataset_alias(ctx, &hdl.alias), hdl))
.try_collect()
.await?;
use tokio_stream::StreamExt;
let downstream_dataset_ids: Vec<_> = dependency_graph_service
.get_downstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let mut downstream = Vec::with_capacity(downstream_dataset_ids.len());
for downstream_dataset_id in downstream_dataset_ids {
let hdl = dataset_repo
.resolve_dataset_ref(&downstream_dataset_id.as_local_ref())
.await
.int_err()?;
downstream.push(Dataset::new(
Account::from_dataset_alias(ctx, &hdl.alias),
hdl,
));
}

Ok(downstream)
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_error_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn test_internal_error() {
.add::<EventBus>()
.add_value(CurrentAccountSubject::new_test())
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_gql_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use opendatafabric::*;
async fn create_catalog_with_local_workspace(tempdir: &Path) -> dill::Catalog {
dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.join("datasets"))
Expand Down
22 changes: 20 additions & 2 deletions src/adapter/graphql/tests/tests/test_gql_datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ async fn dataset_rename_name_collision() {
#[test_log::test(tokio::test)]
async fn dataset_delete_success() {
let harness = GraphQLDatasetsHarness::new();
harness.init_dependencies_graph().await;

let foo_result = harness
.create_root_dataset(DatasetName::new_unchecked("foo"))
Expand Down Expand Up @@ -426,6 +427,7 @@ async fn dataset_delete_success() {
#[test_log::test(tokio::test)]
async fn dataset_delete_dangling_ref() {
let harness = GraphQLDatasetsHarness::new();
harness.init_dependencies_graph().await;

let foo_result = harness
.create_root_dataset(DatasetName::new_unchecked("foo"))
Expand Down Expand Up @@ -531,7 +533,7 @@ async fn dataset_view_permissions() {

struct GraphQLDatasetsHarness {
_tempdir: tempfile::TempDir,
_base_catalog: dill::Catalog,
base_catalog: dill::Catalog,
catalog_authorized: dill::Catalog,
catalog_anonymous: dill::Catalog,
}
Expand All @@ -544,6 +546,7 @@ impl GraphQLDatasetsHarness {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(datasets_dir)
Expand All @@ -559,12 +562,27 @@ impl GraphQLDatasetsHarness {

Self {
_tempdir: tempdir,
_base_catalog: base_catalog,
base_catalog,
catalog_anonymous,
catalog_authorized,
}
}

pub async fn init_dependencies_graph(&self) {
let dataset_repo = self
.catalog_authorized
.get_one::<dyn DatasetRepository>()
.unwrap();
let dependency_graph_service = self
.base_catalog
.get_one::<dyn DependencyGraphService>()
.unwrap();
dependency_graph_service
.eager_initialization(&DependencyGraphRepositoryInMemory::new(dataset_repo))
.await
.unwrap();
}

pub async fn create_root_dataset(&self, name: DatasetName) -> CreateDatasetResult {
let dataset_repo = self
.catalog_authorized
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ async fn metadata_chain_append_event() {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down Expand Up @@ -274,6 +275,7 @@ async fn metadata_update_readme_new() {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_gql_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async fn query() {

let cat = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_value(CurrentAccountSubject::new_test())
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add_builder(
Expand Down
1 change: 1 addition & 0 deletions src/adapter/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ doctest = false
opendatafabric = { workspace = true }
# TODO: Adapters should depend only on kamu-domain crate and be implementation-agnostic
kamu = { workspace = true }
event-bus = { workspace = true }

axum = { version = "0.6", features = ["ws", "headers"] }
axum-extra = { version = "0.8", features = ["async-read-body"] }
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/http/src/simple_protocol/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::str::FromStr;
use std::sync::Arc;

use event_bus::EventBus;
use kamu::domain::*;
use opendatafabric::serde::flatbuffers::FlatbuffersMetadataBlockSerializer;
use opendatafabric::serde::MetadataBlockSerializer;
Expand Down Expand Up @@ -225,9 +226,12 @@ pub async fn dataset_push_ws_upgrade_handler(
Err(err) => Err(err.api_err()),
}?;

let event_bus = catalog.get_one::<EventBus>().unwrap();

Ok(ws.on_upgrade(|socket| {
AxumServerPushProtocolInstance::new(
socket,
event_bus,
dataset_repo,
dataset_ref,
dataset,
Expand Down
25 changes: 24 additions & 1 deletion src/adapter/http/src/smart_protocol/axum_server_push_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
use std::collections::VecDeque;
use std::sync::Arc;

use event_bus::EventBus;
use kamu::domain::events::DatasetEventDependenciesUpdated;
use kamu::domain::{
BlockRef,
CorruptedSourceError,
Dataset,
DatasetRepository,
ErrorIntoInternal,
GetSummaryOpts,
ResultIntoInternal,
};
use opendatafabric::{AsTypedBlock, DatasetRef, MetadataBlock, Multihash};
Expand All @@ -36,6 +39,7 @@ const MIN_UPLOAD_PROGRESS_PING_DELAY_SEC: u64 = 10;

pub struct AxumServerPushProtocolInstance {
socket: axum::extract::ws::WebSocket,
event_bus: Arc<EventBus>,
dataset_repo: Arc<dyn DatasetRepository>,
dataset_ref: DatasetRef,
dataset: Option<Arc<dyn Dataset>>,
Expand All @@ -46,6 +50,7 @@ pub struct AxumServerPushProtocolInstance {
impl AxumServerPushProtocolInstance {
pub fn new(
socket: axum::extract::ws::WebSocket,
event_bus: Arc<EventBus>,
dataset_repo: Arc<dyn DatasetRepository>,
dataset_ref: DatasetRef,
dataset: Option<Arc<dyn Dataset>>,
Expand All @@ -54,6 +59,7 @@ impl AxumServerPushProtocolInstance {
) -> Self {
Self {
socket,
event_bus,
dataset_repo,
dataset_ref,
dataset,
Expand Down Expand Up @@ -314,12 +320,29 @@ impl AxumServerPushProtocolInstance {
tracing::debug!("Push client sent a complete request. Commiting the dataset");

if new_blocks.len() > 0 {
dataset_append_metadata(self.dataset.as_ref().unwrap().as_ref(), new_blocks)
let dataset = self.dataset.as_ref().unwrap().as_ref();
let response = dataset_append_metadata(dataset, new_blocks)
.await
.map_err(|e| {
tracing::debug!("Appending dataset metadata failed with error: {}", e);
PushServerError::Internal(e.int_err())
})?;

// TODO: encapsulate this inside dataset/chain
if !response.new_upstream_ids.is_empty() {
let summary = dataset
.get_summary(GetSummaryOpts::default())
.await
.int_err()?;

self.event_bus
.dispatch_event(DatasetEventDependenciesUpdated {
dataset_id: summary.id.clone(),
new_upstream_ids: response.new_upstream_ids,
})
.await
.int_err()?;
}
}

tracing::debug!("Sending completion confirmation");
Expand Down
29 changes: 27 additions & 2 deletions src/adapter/http/src/smart_protocol/protocol_dataset_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,41 @@ pub async fn decode_metadata_batch(

/////////////////////////////////////////////////////////////////////////////////////////

pub struct AppendMetadataResponse {
pub new_upstream_ids: Vec<opendatafabric::DatasetID>,
}

pub async fn dataset_append_metadata(
dataset: &dyn Dataset,
metadata: VecDeque<(Multihash, MetadataBlock)>,
) -> Result<(), AppendError> {
) -> Result<AppendMetadataResponse, AppendError> {
let old_head = metadata.front().unwrap().1.prev_block_hash.clone();
let new_head = metadata.back().unwrap().0.clone();

let metadata_chain = dataset.as_metadata_chain();

let mut new_upstream_ids: Vec<opendatafabric::DatasetID> = vec![];

for (hash, block) in metadata {
tracing::debug!(sequence_numer = %block.sequence_number, hash = %hash, "Appending block");

if let opendatafabric::MetadataEvent::SetTransform(transform) = &block.event {
// Collect only the latest upstream dataset IDs
new_upstream_ids.clear();
for new_input in transform.inputs.iter() {
if let Some(id) = &new_input.id {
new_upstream_ids.push(id.clone());
} else {
return Err(AppendError::InvalidBlock(
AppendValidationError::InvalidEvent(InvalidEventError::new(
block.event,
"Transform input with unresolved ID",
)),
));
}
}
}

metadata_chain
.append(
block,
Expand All @@ -227,7 +252,7 @@ pub async fn dataset_append_metadata(
)
.await?;

Ok(())
Ok(AppendMetadataResponse { new_upstream_ids })
}

/////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 32ab9e9

Please sign in to comment.