Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more types from dora-core to dora-message to avoid dependency #711

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
descriptor::Descriptor,
metadata::ArrowTypeInfoExt,
topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
uhlc,
};
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt};
use dora_message::cli_to_coordinator::ControlRequest;
use dora_message::common::LogMessage;
use dora_message::coordinator_to_cli::ControlRequestReply;
Expand Down Expand Up @@ -35,7 +35,7 @@

let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?

Check warning on line 38 in binaries/cli/src/attach.rs

View workflow job for this annotation

GitHub Actions / Typos

"canoncialize" should be "canonicalize".
.parent()
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dora_core::{
config::OperatorId,
descriptor::{Descriptor, SINGLE_OPERATOR_DEFAULT_ID},
descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID},
};
use eyre::{eyre, Context};
use std::{path::Path, process::Command};
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fs::File, io::Write, path::Path};

use dora_core::descriptor::Descriptor;
use dora_core::descriptor::{Descriptor, DescriptorExt};
use eyre::Context;

const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html");
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use colored::Colorize;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::{source_is_url, Descriptor},
descriptor::{source_is_url, Descriptor, DescriptorExt},
topics::{
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
descriptor::{Descriptor, ResolvedNode},
uhlc::{self, HLC},
};
use dora_message::{
Expand All @@ -16,6 +15,7 @@ use dora_message::{
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
Expand Down
6 changes: 2 additions & 4 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use crate::{
DaemonConnection,
};

use dora_core::{
descriptor::{Descriptor, ResolvedNode},
uhlc::HLC,
};
use dora_core::{descriptor::DescriptorExt, uhlc::HLC};
use dora_message::{
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped},
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
Expand Down
76 changes: 71 additions & 5 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::{
config::{DataId, Input, InputMapping, NodeId, OperatorId},
descriptor::{runtime_node_inputs, CoreNodeKind, Descriptor, ResolvedNode},
config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
descriptor::{
read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
DYNAMIC_SOURCE,
},
topics::LOCALHOST,
uhlc::{self, HLC},
};
Expand All @@ -20,7 +23,7 @@
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
};
use dora_node_api::Parameter;
use dora_node_api::{arrow::datatypes::DataType, Parameter};
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
Expand Down Expand Up @@ -157,12 +160,12 @@
pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?

Check warning on line 163 in binaries/daemon/src/lib.rs

View workflow job for this annotation

GitHub Actions / Typos

"canoncialize" should be "canonicalize".
.parent()
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

let descriptor = Descriptor::read(dataflow_path).await?;
let descriptor = read_as_descriptor(dataflow_path).await?;
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

Expand Down Expand Up @@ -1565,7 +1568,7 @@

let metadata = metadata::Metadata::from_parameters(
hlc.new_timestamp(),
ArrowTypeInfo::empty(),
empty_type_info(),
parameters,
);

Expand Down Expand Up @@ -1672,6 +1675,18 @@
}
}

fn empty_type_info() -> ArrowTypeInfo {
ArrowTypeInfo {
data_type: DataType::Null,
len: 0,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: Vec::new(),
child_data: Vec::new(),
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OutputId(NodeId, DataId);
type InputId = (NodeId, DataId);
Expand Down Expand Up @@ -1821,3 +1836,54 @@
self.caused_by.entry(affected_node).or_insert(causing_node);
}
}

fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
n.operators
.iter()
.flat_map(|operator| {
operator.config.inputs.iter().map(|(input_id, mapping)| {
(
DataId::from(format!("{}/{input_id}", operator.id)),
mapping.clone(),
)
})
})
.collect()
}

fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
n.operators
.iter()
.flat_map(|operator| {
operator
.config
.outputs
.iter()
.map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
})
.collect()
}

trait CoreNodeKindExt {
fn run_config(&self) -> NodeRunConfig;
fn dynamic(&self) -> bool;
}

impl CoreNodeKindExt for CoreNodeKind {
fn run_config(&self) -> NodeRunConfig {
match self {
CoreNodeKind::Runtime(n) => NodeRunConfig {
inputs: runtime_node_inputs(n),
outputs: runtime_node_outputs(n),
},
CoreNodeKind::Custom(n) => n.run_config.clone(),
}
}

fn dynamic(&self) -> bool {
match self {
CoreNodeKind::Runtime(_n) => false,
CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
}
}
}
6 changes: 3 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, OutputId,
RunningNode,
log, node_communication::spawn_listener_loop, node_inputs, CoreNodeKindExt, DoraEvent, Event,
OutputId, RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
Expand All @@ -9,7 +9,7 @@
config::DataId,
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
uhlc::HLC,
Expand Down Expand Up @@ -101,7 +101,7 @@
let resolved_path = if source_is_url(source) {
// try to download the shared library
let target_dir = Path::new("build");
download_file(source, &target_dir)

Check warning on line 104 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.await
.wrap_err("failed to download custom node")?
} else {
Expand Down
1 change: 1 addition & 0 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(&python_source.source, &target_path))

Check warning on line 50 in binaries/runtime/src/operator/python.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.wrap_err("failed to download Python operator")?
} else {
Path::new(&python_source.source).to_owned()
Expand Down Expand Up @@ -290,6 +290,7 @@
use super::SendOutputCallback;
use aligned_vec::{AVec, ConstAlign};
use arrow::{array::ArrayData, pyarrow::FromPyArrow};
use dora_core::metadata::ArrowTypeInfoExt;
use dora_message::metadata::ArrowTypeInfo;
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
descriptor::{read_as_descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_message::{
Expand Down Expand Up @@ -115,7 +115,7 @@ async fn start_dataflow(
dataflow: &Path,
coordinator_events_tx: &Sender<Event>,
) -> eyre::Result<Uuid> {
let dataflow_descriptor = Descriptor::read(dataflow)
let dataflow_descriptor = read_as_descriptor(dataflow)
.await
.wrap_err("failed to read yaml dataflow")?;
let working_dir = dataflow
Expand Down
2 changes: 1 addition & 1 deletion libraries/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-message = { workspace = true }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
Expand All @@ -22,4 +23,3 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
uhlc = "0.5.1"
2 changes: 1 addition & 1 deletion libraries/core/src/bin/generate_schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, path::Path};

use dora_core::descriptor::Descriptor;
use dora_message::descriptor::Descriptor;
use schemars::schema_for;

fn main() {
Expand Down
Loading
Loading