Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment/dependencies graph #364

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::Arc;
use dill::Component;
use event_bus::EventBus;
use kamu::testing::MetadataFactory;
use kamu::DatasetRepositoryLocalFs;
use kamu::{DatasetRepositoryLocalFs, DependencyGraphServiceInMemory};
use kamu_adapter_auth_oso::{KamuAuthOso, OsoDatasetAuthorizer};
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DatasetActionUnauthorizedError};
use kamu_core::{AccessError, CurrentAccountSubject, DatasetRepository};
Expand Down Expand Up @@ -111,6 +111,7 @@ impl DatasetAuthorizerHarness {
)))
.add::<KamuAuthOso>()
.add::<OsoDatasetAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(datasets_dir)
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ kamu-data-utils = { workspace = true }
kamu-core = { workspace = true }
kamu-task-system = { workspace = true }
kamu-flow-system = { workspace = true }
event-bus = { workspace = true }

async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] }
async-trait = { version = "0.1", default-features = false }
Expand All @@ -36,6 +37,7 @@ indoc = "2"
serde = "1"
serde_json = "1"
tokio = { version = "1", default-features = false, features = [] }
tokio-stream = { version = "0.1", default-features = false }
tracing = "0.1"
thiserror = { version = "1", default-features = false }

Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#![feature(error_generic_member_access)]
#![feature(error_in_core)]
#![feature(int_roundings)]
#![feature(async_closure)]

pub mod extensions;
pub(crate) mod mutations;
Expand Down
44 changes: 44 additions & 0 deletions src/adapter/graphql/src/mutations/metadata_chain_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use domain::events::DatasetEventDependenciesUpdated;
use domain::DatasetRepository;
use event_bus::EventBus;
use kamu_core::{self as domain};
use opendatafabric as odf;

Expand Down Expand Up @@ -80,6 +83,35 @@ impl MetadataChainMut {
}
};

use domain::DatasetRepositoryExt;
let dataset_repo = from_catalog::<dyn DatasetRepository>(ctx).unwrap();

let mut new_upstream_ids: Vec<opendatafabric::DatasetID> = vec![];

if let opendatafabric::MetadataEvent::SetTransform(transform) = &event {
for new_input in transform.inputs.iter() {
if let Some(id) = &new_input.id {
new_upstream_ids.push(id.clone());
} else if let Some(dataset_ref_any) = &new_input.dataset_ref {
let maybe_hdl = dataset_repo
.try_resolve_dataset_ref_any(dataset_ref_any)
zaychenko-sergei marked this conversation as resolved.
Show resolved Hide resolved
.await
.int_err()?;
if let Some(hdl) = maybe_hdl {
new_upstream_ids.push(hdl.id);
}
} else {
let local_ref = opendatafabric::DatasetAlias::new(None, new_input.name.clone())
.as_local_ref();
let hdl = dataset_repo
.resolve_dataset_ref(&local_ref)
.await
.int_err()?;
new_upstream_ids.push(hdl.id);
}
}
}

let dataset = self.get_dataset(ctx).await?;

let result = match dataset
Expand All @@ -103,6 +135,18 @@ impl MetadataChainMut {
Err(e @ domain::CommitError::Internal(_)) => return Err(e.int_err().into()),
};

// TODO: encapsulate this inside dataset/chain
zaychenko-sergei marked this conversation as resolved.
Show resolved Hide resolved
if !new_upstream_ids.is_empty() {
let event_bus = from_catalog::<EventBus>(ctx).unwrap();
event_bus
.dispatch_event(DatasetEventDependenciesUpdated {
dataset_id: self.dataset_handle.id.clone(),
new_upstream_ids,
})
.await
.int_err()?;
}

Ok(result)
}
}
Expand Down
61 changes: 40 additions & 21 deletions src/adapter/graphql/src/queries/datasets/dataset_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// by the Apache License, Version 2.0.

use chrono::prelude::*;
use futures::TryStreamExt;
use kamu_core::{self as domain, MetadataChainExt, TryStreamExtExt};
use opendatafabric as odf;
use opendatafabric::{AsTypedBlock, VariantOf};
Expand Down Expand Up @@ -93,39 +92,59 @@ impl DatasetMetadata {

/// Current upstream dependencies of a dataset
async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dependency_graph_service =
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();

let dataset = self.get_dataset(ctx).await?;
let summary = dataset
.get_summary(domain::GetSummaryOpts::default())
use tokio_stream::StreamExt;
let upstream_dataset_ids: Vec<_> = dependency_graph_service
.get_upstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?;
.int_err()?
.collect()
.await;

let mut dependencies: Vec<_> = Vec::new();
for input in summary.dependencies.into_iter() {
let dataset_id = input.id.unwrap().clone();
let dataset_handle = dataset_repo
.resolve_dataset_ref(&dataset_id.as_local_ref())
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let mut upstream = Vec::with_capacity(upstream_dataset_ids.len());
for upstream_dataset_id in upstream_dataset_ids {
let hdl = dataset_repo
.resolve_dataset_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()?;
dependencies.push(Dataset::new(
Account::from_dataset_alias(ctx, &dataset_handle.alias),
dataset_handle,
upstream.push(Dataset::new(
Account::from_dataset_alias(ctx, &hdl.alias),
hdl,
));
}
Ok(dependencies)

Ok(upstream)
}

// TODO: Convert to collection
/// Current downstream dependencies of a dataset
async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dependency_graph_service =
from_catalog::<dyn domain::DependencyGraphService>(ctx).unwrap();

let downstream: Vec<_> = dataset_repo
.get_downstream_dependencies(&self.dataset_handle.as_local_ref())
.map_ok(|hdl| Dataset::new(Account::from_dataset_alias(ctx, &hdl.alias), hdl))
.try_collect()
.await?;
use tokio_stream::StreamExt;
let downstream_dataset_ids: Vec<_> = dependency_graph_service
.get_downstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let mut downstream = Vec::with_capacity(downstream_dataset_ids.len());
for downstream_dataset_id in downstream_dataset_ids {
let hdl = dataset_repo
.resolve_dataset_ref(&downstream_dataset_id.as_local_ref())
.await
.int_err()?;
downstream.push(Dataset::new(
Account::from_dataset_alias(ctx, &hdl.alias),
hdl,
));
}

Ok(downstream)
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_error_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn test_internal_error() {
.add::<EventBus>()
.add_value(CurrentAccountSubject::new_test())
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_gql_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use opendatafabric::*;
async fn create_catalog_with_local_workspace(tempdir: &Path) -> dill::Catalog {
dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.join("datasets"))
Expand Down
22 changes: 20 additions & 2 deletions src/adapter/graphql/tests/tests/test_gql_datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ async fn dataset_rename_name_collision() {
#[test_log::test(tokio::test)]
async fn dataset_delete_success() {
let harness = GraphQLDatasetsHarness::new();
harness.init_dependencies_graph().await;

let foo_result = harness
.create_root_dataset(DatasetName::new_unchecked("foo"))
Expand Down Expand Up @@ -426,6 +427,7 @@ async fn dataset_delete_success() {
#[test_log::test(tokio::test)]
async fn dataset_delete_dangling_ref() {
let harness = GraphQLDatasetsHarness::new();
harness.init_dependencies_graph().await;

let foo_result = harness
.create_root_dataset(DatasetName::new_unchecked("foo"))
Expand Down Expand Up @@ -531,7 +533,7 @@ async fn dataset_view_permissions() {

struct GraphQLDatasetsHarness {
_tempdir: tempfile::TempDir,
_base_catalog: dill::Catalog,
base_catalog: dill::Catalog,
catalog_authorized: dill::Catalog,
catalog_anonymous: dill::Catalog,
}
Expand All @@ -544,6 +546,7 @@ impl GraphQLDatasetsHarness {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(datasets_dir)
Expand All @@ -559,12 +562,27 @@ impl GraphQLDatasetsHarness {

Self {
_tempdir: tempdir,
_base_catalog: base_catalog,
base_catalog,
catalog_anonymous,
catalog_authorized,
}
}

pub async fn init_dependencies_graph(&self) {
let dataset_repo = self
.catalog_authorized
.get_one::<dyn DatasetRepository>()
.unwrap();
let dependency_graph_service = self
.base_catalog
.get_one::<dyn DependencyGraphService>()
.unwrap();
dependency_graph_service
.eager_initialization(&DependencyGraphServiceInitializer::new(dataset_repo))
.await
.unwrap();
}

pub async fn create_root_dataset(&self, name: DatasetName) -> CreateDatasetResult {
let dataset_repo = self
.catalog_authorized
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/tests/tests/test_gql_metadata_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async fn metadata_chain_append_event() {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down Expand Up @@ -125,6 +126,7 @@ async fn metadata_update_readme_new() {

let base_catalog = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_builder(
DatasetRepositoryLocalFs::builder()
.with_root(tempdir.path().join("datasets"))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/test_gql_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async fn query() {

let cat = dill::CatalogBuilder::new()
.add::<EventBus>()
.add::<DependencyGraphServiceInMemory>()
.add_value(CurrentAccountSubject::new_test())
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add_builder(
Expand Down
1 change: 1 addition & 0 deletions src/adapter/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ doctest = false
opendatafabric = { workspace = true }
# TODO: Adapters should depend only on kamu-domain crate and be implementation-agnostic
kamu = { workspace = true }
event-bus = { workspace = true }

axum = { version = "0.6", features = ["ws", "headers"] }
axum-extra = { version = "0.8", features = ["async-read-body"] }
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/http/src/simple_protocol/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::str::FromStr;
use std::sync::Arc;

use event_bus::EventBus;
use kamu::domain::*;
use opendatafabric::serde::flatbuffers::FlatbuffersMetadataBlockSerializer;
use opendatafabric::serde::MetadataBlockSerializer;
Expand Down Expand Up @@ -225,9 +226,12 @@ pub async fn dataset_push_ws_upgrade_handler(
Err(err) => Err(err.api_err()),
}?;

let event_bus = catalog.get_one::<EventBus>().unwrap();

Ok(ws.on_upgrade(|socket| {
AxumServerPushProtocolInstance::new(
socket,
event_bus,
dataset_repo,
dataset_ref,
dataset,
Expand Down
Loading
Loading