From 7c44e7a2e60dfa11a1557615a7efbc4ebb418f48 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 13 Nov 2024 15:01:28 +0100 Subject: [PATCH 1/5] Move more types from `dora-core` to `dora-message` to avoid dependency Make `dora-message` a dependency of `dora-core`, instead of the other way around. This way, we can continue to freely bump the version of `dora-core` with the other workspace crates, without introducing errors such as #708. --- Cargo.lock | 8 +- binaries/cli/src/attach.rs | 2 +- binaries/cli/src/build.rs | 2 +- binaries/cli/src/graph/mod.rs | 2 +- binaries/cli/src/lib.rs | 2 +- binaries/coordinator/src/lib.rs | 2 +- binaries/coordinator/src/run/mod.rs | 6 +- binaries/daemon/src/lib.rs | 7 +- examples/multiple-daemons/run.rs | 4 +- libraries/core/Cargo.toml | 2 +- libraries/core/src/bin/generate_schema.rs | 2 +- libraries/core/src/descriptor/mod.rs | 493 ++++-------------- libraries/core/src/descriptor/validate.rs | 10 +- libraries/core/src/descriptor/visualize.rs | 9 +- libraries/core/src/lib.rs | 3 +- libraries/message/Cargo.toml | 7 +- libraries/message/src/cli_to_coordinator.rs | 7 +- libraries/message/src/common.rs | 3 +- libraries/{core => message}/src/config.rs | 285 +++------- libraries/message/src/coordinator_to_cli.rs | 6 +- .../message/src/coordinator_to_daemon.rs | 8 +- .../message/src/daemon_to_coordinator.rs | 4 +- libraries/message/src/daemon_to_daemon.rs | 7 +- libraries/message/src/daemon_to_node.rs | 9 +- libraries/message/src/descriptor.rs | 331 ++++++++++++ libraries/message/src/id.rs | 121 +++++ libraries/message/src/lib.rs | 5 + libraries/message/src/metadata.rs | 1 - libraries/message/src/node_to_daemon.rs | 9 +- 29 files changed, 709 insertions(+), 648 deletions(-) rename libraries/{core => message}/src/config.rs (76%) create mode 100644 libraries/message/src/descriptor.rs create mode 100644 libraries/message/src/id.rs diff --git a/Cargo.lock b/Cargo.lock index ff140c4d..ed113c4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2403,6 +2403,7 @@ dependencies = [ name = "dora-core" version = "0.3.6" dependencies = [ + "dora-message", "eyre", "log", "once_cell", @@ -2413,7 +2414,6 @@ dependencies = [ "serde_yaml 0.9.34+deprecated", "tokio", "tracing", - "uhlc", "uuid", "which", ] @@ -2486,12 +2486,16 @@ dependencies = [ "aligned-vec", "arrow-data", "arrow-schema", - "dora-core", "eyre", "log", + "once_cell", + "schemars", "semver", "serde", + "serde-with-expand-env", + "serde_yaml 0.9.34+deprecated", "tokio", + "uhlc", "uuid", ] diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 80eefd36..f2e5eeba 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -1,6 +1,6 @@ use colored::Colorize; use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; -use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor}; +use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt}; use dora_message::cli_to_coordinator::ControlRequest; use dora_message::common::LogMessage; use dora_message::coordinator_to_cli::ControlRequestReply; diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index 16fdfb84..ee88bc65 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -1,6 +1,6 @@ use dora_core::{ config::OperatorId, - descriptor::{Descriptor, SINGLE_OPERATOR_DEFAULT_ID}, + descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID}, }; use eyre::{eyre, Context}; use std::{path::Path, process::Command}; diff --git a/binaries/cli/src/graph/mod.rs b/binaries/cli/src/graph/mod.rs index c28f9eb4..0dbd55b6 100644 --- a/binaries/cli/src/graph/mod.rs +++ b/binaries/cli/src/graph/mod.rs @@ -1,6 +1,6 @@ use std::{fs::File, io::Write, path::Path}; -use dora_core::descriptor::Descriptor; +use dora_core::descriptor::{Descriptor, DescriptorExt}; use eyre::Context; const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html"); diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index c86c3664..d50a9b17 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -3,7 +3,7 @@ use colored::Colorize; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; use dora_coordinator::Event; use dora_core::{ - descriptor::{source_is_url, Descriptor}, + descriptor::{source_is_url, Descriptor, DescriptorExt}, topics::{ DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0af6b283..7ca861f6 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -5,7 +5,6 @@ use crate::{ pub use control::ControlEvent; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::{Descriptor, ResolvedNode}, uhlc::{self, HLC}, }; use dora_message::{ @@ -16,6 +15,7 @@ use dora_message::{ }, coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, + descriptor::{Descriptor, ResolvedNode}, }; use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 1aec2c9c..1ba6b74d 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -3,13 +3,11 @@ use crate::{ DaemonConnection, }; -use dora_core::{ - descriptor::{Descriptor, ResolvedNode}, - uhlc::HLC, -}; +use dora_core::{descriptor::DescriptorExt, uhlc::HLC}; use dora_message::{ coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped}, daemon_to_coordinator::DaemonCoordinatorReply, + descriptor::{Descriptor, ResolvedNode}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use std::{ diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 166d3164..6fbe7be9 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -3,7 +3,10 @@ use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::{ config::{DataId, Input, InputMapping, NodeId, OperatorId}, - descriptor::{runtime_node_inputs, CoreNodeKind, Descriptor, ResolvedNode}, + descriptor::{ + read_as_descriptor, runtime_node_inputs, CoreNodeKind, Descriptor, DescriptorExt, + ResolvedNode, + }, topics::LOCALHOST, uhlc::{self, HLC}, }; @@ -162,7 +165,7 @@ impl Daemon { .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - let descriptor = Descriptor::read(dataflow_path).await?; + let descriptor = read_as_descriptor(dataflow_path).await?; descriptor.check(&working_dir)?; let nodes = descriptor.resolve_aliases_and_set_defaults()?; diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 25b5d277..77020401 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,6 +1,6 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ - descriptor::Descriptor, + descriptor::{read_as_descriptor, DescriptorExt}, topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_message::{ @@ -115,7 +115,7 @@ async fn start_dataflow( dataflow: &Path, coordinator_events_tx: &Sender, ) -> eyre::Result { - let dataflow_descriptor = Descriptor::read(dataflow) + let dataflow_descriptor = read_as_descriptor(dataflow) .await .wrap_err("failed to read yaml dataflow")?; let working_dir = dataflow diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index d3b83876..9b321562 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -10,6 +10,7 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dora-message = { workspace = true } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" @@ -22,4 +23,3 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } schemars = "0.8.19" serde_json = "1.0.117" log = { version = "0.4.21", features = ["serde"] } -uhlc = "0.5.1" diff --git a/libraries/core/src/bin/generate_schema.rs b/libraries/core/src/bin/generate_schema.rs index 5ddd3286..efa12a82 100644 --- a/libraries/core/src/bin/generate_schema.rs +++ b/libraries/core/src/bin/generate_schema.rs @@ -1,6 +1,6 @@ use std::{env, path::Path}; -use dora_core::descriptor::Descriptor; +use dora_message::descriptor::Descriptor; use schemars::schema_for; fn main() { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 92b3460c..8db78a10 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,41 +1,43 @@ -use crate::config::{ - CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId, +use dora_message::{ + config::{Input, InputMapping, NodeRunConfig}, + id::{DataId, OperatorId}, }; -use eyre::{bail, eyre, Context, OptionExt, Result}; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use serde_with_expand_env::with_expand_envs; +use eyre::{bail, Context, OptionExt, Result}; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, HashMap}, env::consts::EXE_EXTENSION, - fmt, path::{Path, PathBuf}, }; -use tracing::warn; + +// reexport for compatibility +pub use dora_message::descriptor::{ + runtime_node_inputs, CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, + OperatorDefinition, OperatorSource, PythonSource, ResolvedDeploy, ResolvedNode, RuntimeNode, + SingleOperatorDefinition, DYNAMIC_SOURCE, SHELL_SOURCE, +}; pub use visualize::collect_dora_timers; + mod validate; mod visualize; -pub const SHELL_SOURCE: &str = "shell"; -pub const DYNAMIC_SOURCE: &str = "dynamic"; -/// Dataflow description -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields)] -#[schemars(title = "dora-rs specification")] -pub struct Descriptor { - #[schemars(skip)] - #[serde(default)] - pub communication: CommunicationConfig, - #[schemars(skip)] - #[serde(default, rename = "_unstable_deploy")] - pub deploy: Deploy, - pub nodes: Vec, +pub trait DescriptorExt { + fn resolve_aliases_and_set_defaults(&self) -> eyre::Result>; + fn visualize_as_mermaid(&self) -> eyre::Result; + fn blocking_read(path: &Path) -> eyre::Result; + fn parse(buf: Vec) -> eyre::Result; + fn check(&self, working_dir: &Path) -> eyre::Result<()>; + fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + coordinator_is_remote: bool, + ) -> eyre::Result<()>; } pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op"; -impl Descriptor { - pub fn resolve_aliases_and_set_defaults(&self) -> eyre::Result> { +impl DescriptorExt for Descriptor { + fn resolve_aliases_and_set_defaults(&self) -> eyre::Result> { let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); let single_operator_nodes: HashMap<_, _> = self @@ -51,7 +53,7 @@ impl Descriptor { let mut resolved = vec![]; for mut node in self.nodes.clone() { // adjust input mappings - let mut node_kind = node.kind_mut()?; + let mut node_kind = node_kind_mut(&mut node)?; let input_mappings: Vec<_> = match &mut node_kind { NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(), NodeKindMut::Runtime(node) => node @@ -102,7 +104,14 @@ impl Descriptor { name: node.name, description: node.description, env: node.env, - deploy: ResolvedDeploy::new(node.deploy, self), + deploy: { + let default_machine = self.deploy.machine.as_deref().unwrap_or_default(); + let machine = match node.deploy.machine { + Some(m) => m, + None => default_machine.to_owned(), + }; + ResolvedDeploy { machine } + }, kind, }); } @@ -110,35 +119,28 @@ impl Descriptor { Ok(resolved) } - pub fn visualize_as_mermaid(&self) -> eyre::Result { + fn visualize_as_mermaid(&self) -> eyre::Result { let resolved = self.resolve_aliases_and_set_defaults()?; let flowchart = visualize::visualize_nodes(&resolved); Ok(flowchart) } - pub async fn read(path: &Path) -> eyre::Result { - let buf = tokio::fs::read(path) - .await - .context("failed to open given file")?; - Descriptor::parse(buf) - } - - pub fn blocking_read(path: &Path) -> eyre::Result { + fn blocking_read(path: &Path) -> eyre::Result { let buf = std::fs::read(path).context("failed to open given file")?; Descriptor::parse(buf) } - pub fn parse(buf: Vec) -> eyre::Result { + fn parse(buf: Vec) -> eyre::Result { serde_yaml::from_slice(&buf).context("failed to parse given descriptor") } - pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { + fn check(&self, working_dir: &Path) -> eyre::Result<()> { validate::check_dataflow(self, working_dir, None, false) .wrap_err("Dataflow could not be validated.") } - pub fn check_in_daemon( + fn check_in_daemon( &self, working_dir: &Path, remote_machine_id: &[&str], @@ -154,53 +156,70 @@ impl Descriptor { } } -#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Deploy { - pub machine: Option, +pub async fn read_as_descriptor(path: &Path) -> eyre::Result { + let buf = tokio::fs::read(path) + .await + .context("failed to open given file")?; + Descriptor::parse(buf) +} + +fn node_kind_mut(node: &mut Node) -> eyre::Result { + match node.kind()? { + NodeKind::Standard(_) => node + .path + .as_ref() + .map(|path| NodeKindMut::Standard { + path, + inputs: &mut node.inputs, + }) + .ok_or_eyre("no path"), + NodeKind::Runtime(_) => node + .operators + .as_mut() + .map(NodeKindMut::Runtime) + .ok_or_eyre("no operators"), + NodeKind::Custom(_) => node + .custom + .as_mut() + .map(NodeKindMut::Custom) + .ok_or_eyre("no custom"), + NodeKind::Operator(_) => node + .operator + .as_mut() + .map(NodeKindMut::Operator) + .ok_or_eyre("no operator"), + } } -/// Dora Node -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Node { - /// Node identifier - pub id: NodeId, - /// Node name - pub name: Option, - /// Description of the node - pub description: Option, - /// Environment variables - pub env: Option>, +pub fn source_is_url(source: &str) -> bool { + source.contains("://") +} - /// Unstable machine deployment configuration - #[schemars(skip)] - #[serde(default, rename = "_unstable_deploy")] - pub deploy: Deploy, +pub fn resolve_path(source: &str, working_dir: &Path) -> Result { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; - #[serde(default, skip_serializing_if = "Option::is_none")] - operators: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - custom: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - operator: Option, + // Search path within current working directory + if let Ok(abs_path) = working_dir.join(&path).canonicalize() { + Ok(abs_path) + // Search path within $PATH + } else if let Ok(abs_path) = which::which(&path) { + Ok(abs_path) + } else { + bail!("Could not find source path {}", path.display()) + } +} - #[serde(default, skip_serializing_if = "Option::is_none")] - pub path: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub args: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub build: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub send_stdout_as: Option, - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, +pub trait NodeExt { + fn kind(&self) -> eyre::Result; } -impl Node { - pub fn kind(&self) -> eyre::Result { +impl NodeExt for Node { + fn kind(&self) -> eyre::Result { match (&self.path, &self.operators, &self.custom, &self.operator) { (None, None, None, None) => { eyre::bail!( @@ -220,34 +239,6 @@ impl Node { } } } - - fn kind_mut(&mut self) -> eyre::Result { - match self.kind()? { - NodeKind::Standard(_) => self - .path - .as_ref() - .map(|path| NodeKindMut::Standard { - path, - inputs: &mut self.inputs, - }) - .ok_or_eyre("no path"), - NodeKind::Runtime(_) => self - .operators - .as_mut() - .map(NodeKindMut::Runtime) - .ok_or_eyre("no operators"), - NodeKind::Custom(_) => self - .custom - .as_mut() - .map(NodeKindMut::Custom) - .ok_or_eyre("no custom"), - NodeKind::Operator(_) => self - .operator - .as_mut() - .map(NodeKindMut::Operator) - .ok_or_eyre("no operator"), - } - } } #[derive(Debug)] @@ -270,291 +261,3 @@ enum NodeKindMut<'a> { Custom(&'a mut CustomNode), Operator(&'a mut SingleOperatorDefinition), } - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResolvedNode { - pub id: NodeId, - pub name: Option, - pub description: Option, - pub env: Option>, - - #[serde(default)] - pub deploy: ResolvedDeploy, - - #[serde(flatten)] - pub kind: CoreNodeKind, -} - -impl ResolvedNode { - pub fn send_stdout_as(&self) -> Result> { - match &self.kind { - // TODO: Split stdout between operators - CoreNodeKind::Runtime(n) => { - let count = n - .operators - .iter() - .filter(|op| op.config.send_stdout_as.is_some()) - .count(); - if count == 1 && n.operators.len() > 1 { - warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") - } else if count > 1 { - return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); - } - Ok(n.operators.iter().find_map(|op| { - op.config - .send_stdout_as - .clone() - .map(|stdout| format!("{}/{}", op.id, stdout)) - })) - } - CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), - } - } -} - -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct ResolvedDeploy { - pub machine: String, -} -impl ResolvedDeploy { - fn new(deploy: Deploy, descriptor: &Descriptor) -> Self { - let default_machine = descriptor.deploy.machine.as_deref().unwrap_or_default(); - let machine = match deploy.machine { - Some(m) => m, - None => default_machine.to_owned(), - }; - Self { machine } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum CoreNodeKind { - /// Dora runtime node - #[serde(rename = "operators")] - Runtime(RuntimeNode), - Custom(CustomNode), -} - -pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap { - n.operators - .iter() - .flat_map(|operator| { - operator.config.inputs.iter().map(|(input_id, mapping)| { - ( - DataId::from(format!("{}/{input_id}", operator.id)), - mapping.clone(), - ) - }) - }) - .collect() -} - -fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet { - n.operators - .iter() - .flat_map(|operator| { - operator - .config - .outputs - .iter() - .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) - }) - .collect() -} - -impl CoreNodeKind { - pub fn run_config(&self) -> NodeRunConfig { - match self { - CoreNodeKind::Runtime(n) => NodeRunConfig { - inputs: runtime_node_inputs(n), - outputs: runtime_node_outputs(n), - }, - CoreNodeKind::Custom(n) => n.run_config.clone(), - } - } - - pub fn dynamic(&self) -> bool { - match self { - CoreNodeKind::Runtime(_n) => false, - CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(transparent)] -pub struct RuntimeNode { - pub operators: Vec, -} - -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] -pub struct OperatorDefinition { - pub id: OperatorId, - #[serde(flatten)] - pub config: OperatorConfig, -} - -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] -pub struct SingleOperatorDefinition { - /// ID is optional if there is only a single operator. - pub id: Option, - #[serde(flatten)] - pub config: OperatorConfig, -} - -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] -pub struct OperatorConfig { - pub name: Option, - pub description: Option, - - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, - - #[serde(flatten)] - pub source: OperatorSource, - - #[serde(default, skip_serializing_if = "Option::is_none")] - pub build: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub send_stdout_as: Option, -} - -#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] -#[serde(rename_all = "kebab-case")] -pub enum OperatorSource { - SharedLibrary(String), - Python(PythonSource), - #[schemars(skip)] - Wasm(String), -} -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde( - deny_unknown_fields, - from = "PythonSourceDef", - into = "PythonSourceDef" -)] -pub struct PythonSource { - pub source: String, - pub conda_env: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(untagged)] -pub enum PythonSourceDef { - SourceOnly(String), - WithOptions { - source: String, - conda_env: Option, - }, -} - -impl From for PythonSourceDef { - fn from(input: PythonSource) -> Self { - match input { - PythonSource { - source, - conda_env: None, - } => Self::SourceOnly(source), - PythonSource { source, conda_env } => Self::WithOptions { source, conda_env }, - } - } -} - -impl From for PythonSource { - fn from(value: PythonSourceDef) -> Self { - match value { - PythonSourceDef::SourceOnly(source) => Self { - source, - conda_env: None, - }, - PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env }, - } - } -} - -pub fn source_is_url(source: &str) -> bool { - source.contains("://") -} - -pub fn resolve_path(source: &str, working_dir: &Path) -> Result { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; - - // Search path within current working directory - if let Ok(abs_path) = working_dir.join(&path).canonicalize() { - Ok(abs_path) - // Search path within $PATH - } else if let Ok(abs_path) = which::which(&path) { - Ok(abs_path) - } else { - bail!("Could not find source path {}", path.display()) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(deny_unknown_fields)] -pub struct PythonOperatorConfig { - pub path: PathBuf, - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, -} - -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -pub struct CustomNode { - /// Path of the source code - /// - /// If you want to use a specific `conda` environment. - /// Provide the python path within the source. - /// - /// source: /home/peter/miniconda3/bin/python - /// - /// args: some_node.py - /// - /// Source can match any executable in PATH. - pub source: String, - /// Args for the executable. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub args: Option, - /// Environment variables for the custom nodes - /// - /// Deprecated, use outer-level `env` field instead. - pub envs: Option>, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub build: Option, - /// Send stdout and stderr to another node - #[serde(skip_serializing_if = "Option::is_none")] - pub send_stdout_as: Option, - - #[serde(flatten)] - pub run_config: NodeRunConfig, -} - -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(untagged)] -pub enum EnvValue { - #[serde(deserialize_with = "with_expand_envs")] - Bool(bool), - #[serde(deserialize_with = "with_expand_envs")] - Integer(u64), - #[serde(deserialize_with = "with_expand_envs")] - String(String), -} - -impl fmt::Display for EnvValue { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()), - EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()), - EnvValue::String(str) => fmt.write_str(str), - } - } -} diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index ff593f85..6ad29103 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,15 +1,19 @@ use crate::{ adjust_shared_library_path, - config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url}, get_python_path, }; +use dora_message::{ + config::{Input, InputMapping, UserInputMapping}, + descriptor::{CoreNodeKind, OperatorSource, DYNAMIC_SOURCE, SHELL_SOURCE}, + id::{DataId, OperatorId}, +}; use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; -use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; +use super::{resolve_path, Descriptor, DescriptorExt}; const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow( diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 11d7a270..54a1aad5 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,5 +1,10 @@ -use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use crate::config::{format_duration, DataId, Input, InputMapping, NodeId, UserInputMapping}; +use dora_message::{ + config::{format_duration, Input, InputMapping, UserInputMapping}, + descriptor::{CoreNodeKind, OperatorDefinition}, + id::{DataId, NodeId}, +}; + +use super::{CustomNode, ResolvedNode, RuntimeNode}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 08f0a611..7c47211f 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -5,9 +5,8 @@ use std::{ path::Path, }; -pub use uhlc; +pub use dora_message::{config, uhlc}; -pub mod config; pub mod descriptor; pub mod topics; diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 27810734..eca897d0 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -17,8 +17,13 @@ serde = { version = "1.0.136", features = ["derive"] } eyre = "0.6.8" arrow-schema = { workspace = true, features = ["serde"] } tokio = "1.39.2" -dora-core = { workspace = true } +# dora-core = { workspace = true } uuid = { version = "1.7", features = ["serde", "v7"] } log = { version = "0.4.21", features = ["serde"] } aligned-vec = { version = "0.5.0", features = ["serde"] } semver = { version = "1.0.23", features = ["serde"] } +schemars = "0.8.19" +uhlc = "0.5.1" +serde_yaml = "0.9.11" +once_cell = "1.13.0" +serde-with-expand-env = "1.1.0" diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 64482596..fc44833f 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -1,10 +1,11 @@ use std::{path::PathBuf, time::Duration}; -use dora_core::{ - config::{NodeId, OperatorId}, +use uuid::Uuid; + +use crate::{ descriptor::Descriptor, + id::{NodeId, OperatorId}, }; -use uuid::Uuid; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 03a75e88..9ca2eb82 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -2,10 +2,9 @@ use core::fmt; use std::borrow::Cow; use aligned_vec::{AVec, ConstAlign}; -use dora_core::{config::NodeId, uhlc}; use uuid::Uuid; -use crate::DataflowId; +use crate::{id::NodeId, DataflowId}; pub use log::Level as LogLevel; diff --git a/libraries/core/src/config.rs b/libraries/message/src/config.rs similarity index 76% rename from libraries/core/src/config.rs rename to libraries/message/src/config.rs index 0fcf11f2..69d8edc4 100644 --- a/libraries/core/src/config.rs +++ b/libraries/message/src/config.rs @@ -1,129 +1,87 @@ -use once_cell::sync::OnceCell; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use core::fmt; use std::{ - borrow::Borrow, collections::{BTreeMap, BTreeSet}, - convert::Infallible, - fmt, - str::FromStr, time::Duration, }; -#[derive( - Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, -)] -pub struct NodeId(String); - -impl FromStr for NodeId { - type Err = Infallible; - - fn from_str(s: &str) -> Result { - Ok(Self(s.to_owned())) - } -} - -impl From for NodeId { - fn from(id: String) -> Self { - Self(id) - } -} - -impl std::fmt::Display for NodeId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl AsRef for NodeId { - fn as_ref(&self) -> &str { - &self.0 - } -} - -#[derive( - Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, -)] -pub struct OperatorId(String); - -impl FromStr for OperatorId { - type Err = Infallible; - - fn from_str(s: &str) -> Result { - Ok(Self(s.to_owned())) - } -} - -impl From for OperatorId { - fn from(id: String) -> Self { - Self(id) - } -} - -impl std::fmt::Display for OperatorId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl AsRef for OperatorId { - fn as_ref(&self) -> &str { - &self.0 - } -} - -#[derive( - Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, -)] -pub struct DataId(String); - -impl From for String { - fn from(id: DataId) -> Self { - id.0 - } -} - -impl From for DataId { - fn from(id: String) -> Self { - Self(id) - } -} - -impl std::fmt::Display for DataId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} +use once_cell::sync::OnceCell; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; -impl std::ops::Deref for DataId { - type Target = String; +pub use crate::id::{DataId, NodeId, OperatorId}; - fn deref(&self) -> &Self::Target { - &self.0 - } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct NodeRunConfig { + /// Inputs for the nodes as a map from input ID to `node_id/output_id`. + /// + /// e.g. + /// + /// inputs: + /// + /// example_input: example_node/example_output1 + /// + #[serde(default)] + pub inputs: BTreeMap, + /// List of output IDs. + /// + /// e.g. + /// + /// outputs: + /// + /// - output_1 + /// + /// - output_2 + #[serde(default)] + pub outputs: BTreeSet, } -impl AsRef for DataId { - fn as_ref(&self) -> &String { - &self.0 - } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields, from = "InputDef", into = "InputDef")] +pub struct Input { + pub mapping: InputMapping, + pub queue_size: Option, } -impl AsRef for DataId { - fn as_ref(&self) -> &str { - &self.0 - } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum InputDef { + MappingOnly(InputMapping), + WithOptions { + source: InputMapping, + queue_size: Option, + }, } -impl Borrow for DataId { - fn borrow(&self) -> &String { - &self.0 +impl From for InputDef { + fn from(input: Input) -> Self { + match input { + Input { + mapping, + queue_size: None, + } => Self::MappingOnly(mapping), + Input { + mapping, + queue_size, + } => Self::WithOptions { + source: mapping, + queue_size, + }, + } } } -impl Borrow for DataId { - fn borrow(&self) -> &str { - &self.0 +impl From for Input { + fn from(value: InputDef) -> Self { + match value { + InputDef::MappingOnly(mapping) => Self { + mapping, + queue_size: None, + }, + InputDef::WithOptions { source, queue_size } => Self { + mapping: source, + queue_size, + }, + } } } @@ -158,6 +116,22 @@ impl fmt::Display for InputMapping { } } +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } + } +} + +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) +} + impl Serialize for InputMapping { fn serialize(&self, serializer: S) -> Result where @@ -233,97 +207,6 @@ pub struct UserInputMapping { pub output: DataId, } -pub struct FormattedDuration(pub Duration); - -impl fmt::Display for FormattedDuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.0.subsec_millis() == 0 { - write!(f, "secs/{}", self.0.as_secs()) - } else { - write!(f, "millis/{}", self.0.as_millis()) - } - } -} - -pub fn format_duration(interval: Duration) -> FormattedDuration { - FormattedDuration(interval) -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -pub struct NodeRunConfig { - /// Inputs for the nodes as a map from input ID to `node_id/output_id`. - /// - /// e.g. - /// - /// inputs: - /// - /// example_input: example_node/example_output1 - /// - #[serde(default)] - pub inputs: BTreeMap, - /// List of output IDs. - /// - /// e.g. - /// - /// outputs: - /// - /// - output_1 - /// - /// - output_2 - #[serde(default)] - pub outputs: BTreeSet, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields, from = "InputDef", into = "InputDef")] -pub struct Input { - pub mapping: InputMapping, - pub queue_size: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum InputDef { - MappingOnly(InputMapping), - WithOptions { - source: InputMapping, - queue_size: Option, - }, -} - -impl From for InputDef { - fn from(input: Input) -> Self { - match input { - Input { - mapping, - queue_size: None, - } => Self::MappingOnly(mapping), - Input { - mapping, - queue_size, - } => Self::WithOptions { - source: mapping, - queue_size, - }, - } - } -} - -impl From for Input { - fn from(value: InputDef) -> Self { - match value { - InputDef::MappingOnly(mapping) => Self { - mapping, - queue_size: None, - }, - InputDef::WithOptions { source, queue_size } => Self { - mapping: source, - queue_size, - }, - } - } -} - #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub struct CommunicationConfig { diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 9d61ae6e..e8eb64d0 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -1,11 +1,9 @@ use std::collections::{BTreeMap, BTreeSet}; -use dora_core::config::NodeId; -use dora_core::uhlc; use uuid::Uuid; -pub use crate::common::LogMessage; -pub use crate::common::{NodeError, NodeErrorCause, NodeExitStatus}; +pub use crate::common::{LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; +use crate::id::NodeId; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index e8741f62..463a857e 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -1,13 +1,11 @@ use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf, time::Duration}; -use dora_core::{ - config::{NodeId, OperatorId}, - // TODO: how should we version these? +use crate::{ descriptor::{Descriptor, ResolvedNode}, + id::{NodeId, OperatorId}, + DataflowId, }; -use crate::DataflowId; - pub use crate::common::Timestamped; #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 23178a55..528419f4 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -1,11 +1,9 @@ use std::collections::BTreeMap; -use dora_core::{config::NodeId, uhlc}; - pub use crate::common::{ DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, }; -use crate::{current_crate_version, versions_compatible, DataflowId}; +use crate::{current_crate_version, id::NodeId, versions_compatible, DataflowId}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { diff --git a/libraries/message/src/daemon_to_daemon.rs b/libraries/message/src/daemon_to_daemon.rs index 20fb5dd9..457cc3a0 100644 --- a/libraries/message/src/daemon_to_daemon.rs +++ b/libraries/message/src/daemon_to_daemon.rs @@ -1,9 +1,12 @@ use std::collections::BTreeSet; use aligned_vec::{AVec, ConstAlign}; -use dora_core::config::{DataId, NodeId}; -use crate::{metadata::Metadata, DataflowId}; +use crate::{ + id::{DataId, NodeId}, + metadata::Metadata, + DataflowId, +}; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum InterDaemonEvent { diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index 178b3377..acc1630e 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -1,12 +1,13 @@ use std::{net::SocketAddr, path::PathBuf}; -use dora_core::{ - config::{DataId, NodeId, NodeRunConfig, OperatorId}, +use crate::{ + config::NodeRunConfig, descriptor::{Descriptor, OperatorDefinition}, + id::{DataId, NodeId, OperatorId}, + metadata::Metadata, + DataflowId, }; -use crate::{metadata::Metadata, DataflowId}; - pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped}; // Passed via env variable diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs new file mode 100644 index 00000000..a2530616 --- /dev/null +++ b/libraries/message/src/descriptor.rs @@ -0,0 +1,331 @@ +use crate::{ + config::{CommunicationConfig, Input, InputMapping, NodeRunConfig}, + id::{DataId, NodeId, OperatorId}, +}; +use eyre::{eyre, Result}; +use log::warn; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_with_expand_env::with_expand_envs; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, + path::PathBuf, +}; + +pub const SHELL_SOURCE: &str = "shell"; +pub const DYNAMIC_SOURCE: &str = "dynamic"; + +/// Dataflow description +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +#[schemars(title = "dora-rs specification")] +pub struct Descriptor { + #[schemars(skip)] + #[serde(default)] + pub communication: CommunicationConfig, + #[schemars(skip)] + #[serde(default, rename = "_unstable_deploy")] + pub deploy: Deploy, + pub nodes: Vec, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Deploy { + pub machine: Option, +} + +/// Dora Node +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Node { + /// Node identifier + pub id: NodeId, + /// Node name + pub name: Option, + /// Description of the node + pub description: Option, + /// Environment variables + pub env: Option>, + + /// Unstable machine deployment configuration + #[schemars(skip)] + #[serde(default, rename = "_unstable_deploy")] + pub deploy: Deploy, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub operators: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub custom: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub operator: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub path: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResolvedNode { + pub id: NodeId, + pub name: Option, + pub description: Option, + pub env: Option>, + + #[serde(default)] + pub deploy: ResolvedDeploy, + + #[serde(flatten)] + pub kind: CoreNodeKind, +} + +impl ResolvedNode { + pub fn send_stdout_as(&self) -> Result> { + match &self.kind { + // TODO: Split stdout between operators + CoreNodeKind::Runtime(n) => { + let count = n + .operators + .iter() + .filter(|op| op.config.send_stdout_as.is_some()) + .count(); + if count == 1 && n.operators.len() > 1 { + warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") + } else if count > 1 { + return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); + } + Ok(n.operators.iter().find_map(|op| { + op.config + .send_stdout_as + .clone() + .map(|stdout| format!("{}/{}", op.id, stdout)) + })) + } + CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), + } + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ResolvedDeploy { + pub machine: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CoreNodeKind { + /// Dora runtime node + #[serde(rename = "operators")] + Runtime(RuntimeNode), + Custom(CustomNode), +} + +pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap { + n.operators + .iter() + .flat_map(|operator| { + operator.config.inputs.iter().map(|(input_id, mapping)| { + ( + DataId::from(format!("{}/{input_id}", operator.id)), + mapping.clone(), + ) + }) + }) + .collect() +} + +fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet { + n.operators + .iter() + .flat_map(|operator| { + operator + .config + .outputs + .iter() + .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) + }) + .collect() +} + +impl CoreNodeKind { + pub fn run_config(&self) -> NodeRunConfig { + match self { + CoreNodeKind::Runtime(n) => NodeRunConfig { + inputs: runtime_node_inputs(n), + outputs: runtime_node_outputs(n), + }, + CoreNodeKind::Custom(n) => n.run_config.clone(), + } + } + + pub fn dynamic(&self) -> bool { + match self { + CoreNodeKind::Runtime(_n) => false, + CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(transparent)] +pub struct RuntimeNode { + pub operators: Vec, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +pub struct OperatorDefinition { + pub id: OperatorId, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +pub struct SingleOperatorDefinition { + /// ID is optional if there is only a single operator. + pub id: Option, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +pub struct OperatorConfig { + pub name: Option, + pub description: Option, + + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, + + #[serde(flatten)] + pub source: OperatorSource, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +#[serde(rename_all = "kebab-case")] +pub enum OperatorSource { + SharedLibrary(String), + Python(PythonSource), + #[schemars(skip)] + Wasm(String), +} +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde( + deny_unknown_fields, + from = "PythonSourceDef", + into = "PythonSourceDef" +)] +pub struct PythonSource { + pub source: String, + pub conda_env: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(untagged)] +pub enum PythonSourceDef { + SourceOnly(String), + WithOptions { + source: String, + conda_env: Option, + }, +} + +impl From for PythonSourceDef { + fn from(input: PythonSource) -> Self { + match input { + PythonSource { + source, + conda_env: None, + } => Self::SourceOnly(source), + PythonSource { source, conda_env } => Self::WithOptions { source, conda_env }, + } + } +} + +impl From for PythonSource { + fn from(value: PythonSourceDef) -> Self { + match value { + PythonSourceDef::SourceOnly(source) => Self { + source, + conda_env: None, + }, + PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env }, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct PythonOperatorConfig { + pub path: PathBuf, + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct CustomNode { + /// Path of the source code + /// + /// If you want to use a specific `conda` environment. + /// Provide the python path within the source. + /// + /// source: /home/peter/miniconda3/bin/python + /// + /// args: some_node.py + /// + /// Source can match any executable in PATH. + pub source: String, + /// Args for the executable. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, + /// Environment variables for the custom nodes + /// + /// Deprecated, use outer-level `env` field instead. + pub envs: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub build: Option, + /// Send stdout and stderr to another node + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, + + #[serde(flatten)] + pub run_config: NodeRunConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(untagged)] +pub enum EnvValue { + #[serde(deserialize_with = "with_expand_envs")] + Bool(bool), + #[serde(deserialize_with = "with_expand_envs")] + Integer(u64), + #[serde(deserialize_with = "with_expand_envs")] + String(String), +} + +impl fmt::Display for EnvValue { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()), + EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()), + EnvValue::String(str) => fmt.write_str(str), + } + } +} diff --git a/libraries/message/src/id.rs b/libraries/message/src/id.rs new file mode 100644 index 00000000..b14a2a0c --- /dev/null +++ b/libraries/message/src/id.rs @@ -0,0 +1,121 @@ +use std::{borrow::Borrow, convert::Infallible, str::FromStr}; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, +)] +pub struct NodeId(pub(crate) String); + +impl FromStr for NodeId { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_owned())) + } +} + +impl From for NodeId { + fn from(id: String) -> Self { + Self(id) + } +} + +impl std::fmt::Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl AsRef for NodeId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, +)] +pub struct OperatorId(String); + +impl FromStr for OperatorId { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_owned())) + } +} + +impl From for OperatorId { + fn from(id: String) -> Self { + Self(id) + } +} + +impl std::fmt::Display for OperatorId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl AsRef for OperatorId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +#[derive( + Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema, +)] +pub struct DataId(String); + +impl From for String { + fn from(id: DataId) -> Self { + id.0 + } +} + +impl From for DataId { + fn from(id: String) -> Self { + Self(id) + } +} + +impl std::fmt::Display for DataId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl std::ops::Deref for DataId { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef for DataId { + fn as_ref(&self) -> &String { + &self.0 + } +} + +impl AsRef for DataId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Borrow for DataId { + fn borrow(&self) -> &String { + &self.0 + } +} + +impl Borrow for DataId { + fn borrow(&self) -> &str { + &self.0 + } +} diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index a435b34f..795a9c37 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -3,7 +3,12 @@ #![allow(clippy::missing_safety_doc)] +pub use uhlc; + pub mod common; +pub mod config; +pub mod descriptor; +pub mod id; pub mod metadata; pub mod coordinator_to_daemon; diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs index fe526dac..9179eb5c 100644 --- a/libraries/message/src/metadata.rs +++ b/libraries/message/src/metadata.rs @@ -2,7 +2,6 @@ use std::collections::BTreeMap; use arrow_data::ArrayData; use arrow_schema::DataType; -use dora_core::uhlc; use eyre::Context; use serde::{Deserialize, Serialize}; diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index 6967c1a3..bb5a0850 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -1,9 +1,12 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; -use crate::{current_crate_version, metadata::Metadata, versions_compatible, DataflowId}; - -use dora_core::config::{DataId, NodeId}; +use crate::{ + current_crate_version, + id::{DataId, NodeId}, + metadata::Metadata, + versions_compatible, DataflowId, +}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum DaemonRequest { From 49aef8bac6cc58aa974ae26f7858000e84ab5558 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 13 Nov 2024 19:29:17 +0100 Subject: [PATCH 2/5] Move more implementation code out of `dora-message` into `dora-core` Ensures that we need to update `dora-message` less often. --- apis/rust/node/src/node/mod.rs | 1 + binaries/daemon/src/lib.rs | 73 +++++++++++++++++-- binaries/daemon/src/spawn.rs | 6 +- binaries/runtime/src/operator/python.rs | 1 + libraries/core/src/descriptor/mod.rs | 7 +- libraries/core/src/descriptor/validate.rs | 33 ++++++++- libraries/core/src/lib.rs | 1 + libraries/core/src/metadata.rs | 89 +++++++++++++++++++++++ libraries/message/src/descriptor.rs | 75 ------------------- libraries/message/src/lib.rs | 3 + libraries/message/src/metadata.rs | 73 ------------------- 11 files changed, 202 insertions(+), 160 deletions(-) create mode 100644 libraries/core/src/metadata.rs diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 97a58ebb..21affd7c 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -10,6 +10,7 @@ use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, descriptor::Descriptor, + metadata::ArrowTypeInfoExt, topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, uhlc, }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 6fbe7be9..4757f15c 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,10 +2,10 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::{ - config::{DataId, Input, InputMapping, NodeId, OperatorId}, + config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId}, descriptor::{ - read_as_descriptor, runtime_node_inputs, CoreNodeKind, Descriptor, DescriptorExt, - ResolvedNode, + read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode, + DYNAMIC_SOURCE, }, topics::LOCALHOST, uhlc::{self, HLC}, @@ -23,7 +23,7 @@ use dora_message::{ node_to_daemon::{DynamicNodeEvent, Timestamped}, DataflowId, }; -use dora_node_api::Parameter; +use dora_node_api::{arrow::datatypes::DataType, Parameter}; use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; @@ -1568,7 +1568,7 @@ impl RunningDataflow { let metadata = metadata::Metadata::from_parameters( hlc.new_timestamp(), - ArrowTypeInfo::empty(), + empty_type_info(), parameters, ); @@ -1675,6 +1675,18 @@ impl RunningDataflow { } } +fn empty_type_info() -> ArrowTypeInfo { + ArrowTypeInfo { + data_type: DataType::Null, + len: 0, + null_count: 0, + validity: None, + offset: 0, + buffer_offsets: Vec::new(), + child_data: Vec::new(), + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OutputId(NodeId, DataId); type InputId = (NodeId, DataId); @@ -1824,3 +1836,54 @@ impl CascadingErrorCauses { self.caused_by.entry(affected_node).or_insert(causing_node); } } + +fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap { + n.operators + .iter() + .flat_map(|operator| { + operator.config.inputs.iter().map(|(input_id, mapping)| { + ( + DataId::from(format!("{}/{input_id}", operator.id)), + mapping.clone(), + ) + }) + }) + .collect() +} + +fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet { + n.operators + .iter() + .flat_map(|operator| { + operator + .config + .outputs + .iter() + .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) + }) + .collect() +} + +trait CoreNodeKindExt { + fn run_config(&self) -> NodeRunConfig; + fn dynamic(&self) -> bool; +} + +impl CoreNodeKindExt for CoreNodeKind { + fn run_config(&self) -> NodeRunConfig { + match self { + CoreNodeKind::Runtime(n) => NodeRunConfig { + inputs: runtime_node_inputs(n), + outputs: runtime_node_outputs(n), + }, + CoreNodeKind::Custom(n) => n.run_config.clone(), + } + } + + fn dynamic(&self) -> bool { + match self { + CoreNodeKind::Runtime(_n) => false, + CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, + } + } +} diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 87eca5a3..eacf8933 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,6 +1,6 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, OutputId, - RunningNode, + log, node_communication::spawn_listener_loop, node_inputs, CoreNodeKindExt, DoraEvent, Event, + OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; @@ -9,7 +9,7 @@ use dora_core::{ config::DataId, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, - ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, + ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, uhlc::HLC, diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 4885dc37..23cbea6c 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -290,6 +290,7 @@ mod callback_impl { use super::SendOutputCallback; use aligned_vec::{AVec, ConstAlign}; use arrow::{array::ArrayData, pyarrow::FromPyArrow}; + use dora_core::metadata::ArrowTypeInfoExt; use dora_message::metadata::ArrowTypeInfo; use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 8db78a10..021734d7 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -11,10 +11,11 @@ use std::{ // reexport for compatibility pub use dora_message::descriptor::{ - runtime_node_inputs, CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, - OperatorDefinition, OperatorSource, PythonSource, ResolvedDeploy, ResolvedNode, RuntimeNode, - SingleOperatorDefinition, DYNAMIC_SOURCE, SHELL_SOURCE, + CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, OperatorDefinition, OperatorSource, + PythonSource, ResolvedDeploy, ResolvedNode, RuntimeNode, SingleOperatorDefinition, + DYNAMIC_SOURCE, SHELL_SOURCE, }; +pub use validate::ResolvedNodeExt; pub use visualize::collect_dora_timers; mod validate; diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 6ad29103..cffc7f82 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -6,7 +6,7 @@ use crate::{ use dora_message::{ config::{Input, InputMapping, UserInputMapping}, - descriptor::{CoreNodeKind, OperatorSource, DYNAMIC_SOURCE, SHELL_SOURCE}, + descriptor::{CoreNodeKind, OperatorSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE}, id::{DataId, OperatorId}, }; use eyre::{bail, eyre, Context}; @@ -116,6 +116,37 @@ pub fn check_dataflow( Ok(()) } +pub trait ResolvedNodeExt { + fn send_stdout_as(&self) -> eyre::Result>; +} + +impl ResolvedNodeExt for ResolvedNode { + fn send_stdout_as(&self) -> eyre::Result> { + match &self.kind { + // TODO: Split stdout between operators + CoreNodeKind::Runtime(n) => { + let count = n + .operators + .iter() + .filter(|op| op.config.send_stdout_as.is_some()) + .count(); + if count == 1 && n.operators.len() > 1 { + tracing::warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") + } else if count > 1 { + return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); + } + Ok(n.operators.iter().find_map(|op| { + op.config + .send_stdout_as + .clone() + .map(|stdout| format!("{}/{}", op.id, stdout)) + })) + } + CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), + } + } +} + fn check_input( input: &Input, nodes: &[super::ResolvedNode], diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 7c47211f..e9ba36a3 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -8,6 +8,7 @@ use std::{ pub use dora_message::{config, uhlc}; pub mod descriptor; +pub mod metadata; pub mod topics; pub fn adjust_shared_library_path(path: &Path) -> Result { diff --git a/libraries/core/src/metadata.rs b/libraries/core/src/metadata.rs new file mode 100644 index 00000000..1aa85275 --- /dev/null +++ b/libraries/core/src/metadata.rs @@ -0,0 +1,89 @@ +use dora_message::{ + arrow_data::ArrayData, + arrow_schema::DataType, + metadata::{ArrowTypeInfo, BufferOffset}, +}; +use eyre::Context; + +pub trait ArrowTypeInfoExt { + fn empty() -> Self; + fn byte_array(data_len: usize) -> Self; + unsafe fn from_array( + array: &ArrayData, + region_start: *const u8, + region_len: usize, + ) -> eyre::Result + where + Self: Sized; +} + +impl ArrowTypeInfoExt for ArrowTypeInfo { + fn empty() -> Self { + Self { + data_type: DataType::Null, + len: 0, + null_count: 0, + validity: None, + offset: 0, + buffer_offsets: Vec::new(), + child_data: Vec::new(), + } + } + + fn byte_array(data_len: usize) -> Self { + Self { + data_type: DataType::UInt8, + len: data_len, + null_count: 0, + validity: None, + offset: 0, + buffer_offsets: vec![BufferOffset { + offset: 0, + len: data_len, + }], + child_data: Vec::new(), + } + } + + unsafe fn from_array( + array: &ArrayData, + region_start: *const u8, + region_len: usize, + ) -> eyre::Result { + Ok(Self { + data_type: array.data_type().clone(), + len: array.len(), + null_count: array.null_count(), + validity: array.nulls().map(|b| b.validity().to_owned()), + offset: array.offset(), + buffer_offsets: array + .buffers() + .iter() + .map(|b| { + let ptr = b.as_ptr(); + if ptr as usize <= region_start as usize { + eyre::bail!("ptr {ptr:p} starts before region {region_start:p}"); + } + if ptr as usize >= region_start as usize + region_len { + eyre::bail!("ptr {ptr:p} starts after region {region_start:p}"); + } + if ptr as usize + b.len() > region_start as usize + region_len { + eyre::bail!("ptr {ptr:p} ends after region {region_start:p}"); + } + let offset = usize::try_from(unsafe { ptr.offset_from(region_start) }) + .context("offset_from is negative")?; + + Result::<_, eyre::Report>::Ok(BufferOffset { + offset, + len: b.len(), + }) + }) + .collect::>()?, + child_data: array + .child_data() + .iter() + .map(|c| unsafe { Self::from_array(c, region_start, region_len) }) + .collect::>()?, + }) + } +} diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index a2530616..f45e0f5f 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -2,8 +2,6 @@ use crate::{ config::{CommunicationConfig, Input, InputMapping, NodeRunConfig}, id::{DataId, NodeId, OperatorId}, }; -use eyre::{eyre, Result}; -use log::warn; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with_expand_env::with_expand_envs; @@ -89,33 +87,6 @@ pub struct ResolvedNode { pub kind: CoreNodeKind, } -impl ResolvedNode { - pub fn send_stdout_as(&self) -> Result> { - match &self.kind { - // TODO: Split stdout between operators - CoreNodeKind::Runtime(n) => { - let count = n - .operators - .iter() - .filter(|op| op.config.send_stdout_as.is_some()) - .count(); - if count == 1 && n.operators.len() > 1 { - warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") - } else if count > 1 { - return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); - } - Ok(n.operators.iter().find_map(|op| { - op.config - .send_stdout_as - .clone() - .map(|stdout| format!("{}/{}", op.id, stdout)) - })) - } - CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), - } - } -} - #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ResolvedDeploy { pub machine: String, @@ -130,52 +101,6 @@ pub enum CoreNodeKind { Custom(CustomNode), } -pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap { - n.operators - .iter() - .flat_map(|operator| { - operator.config.inputs.iter().map(|(input_id, mapping)| { - ( - DataId::from(format!("{}/{input_id}", operator.id)), - mapping.clone(), - ) - }) - }) - .collect() -} - -fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet { - n.operators - .iter() - .flat_map(|operator| { - operator - .config - .outputs - .iter() - .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) - }) - .collect() -} - -impl CoreNodeKind { - pub fn run_config(&self) -> NodeRunConfig { - match self { - CoreNodeKind::Runtime(n) => NodeRunConfig { - inputs: runtime_node_inputs(n), - outputs: runtime_node_outputs(n), - }, - CoreNodeKind::Custom(n) => n.run_config.clone(), - } - } - - pub fn dynamic(&self) -> bool { - match self { - CoreNodeKind::Runtime(_n) => false, - CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, - } - } -} - #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(transparent)] pub struct RuntimeNode { diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 795a9c37..989e5a72 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -22,6 +22,9 @@ pub mod node_to_daemon; pub mod cli_to_coordinator; pub mod coordinator_to_cli; +pub use arrow_data; +pub use arrow_schema; + pub type DataflowId = uuid::Uuid; fn current_crate_version() -> semver::Version { diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs index 9179eb5c..f0328c5c 100644 --- a/libraries/message/src/metadata.rs +++ b/libraries/message/src/metadata.rs @@ -1,8 +1,6 @@ use std::collections::BTreeMap; -use arrow_data::ArrayData; use arrow_schema::DataType; -use eyre::Context; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -57,77 +55,6 @@ pub struct ArrowTypeInfo { pub child_data: Vec, } -impl ArrowTypeInfo { - pub const fn empty() -> Self { - Self { - data_type: DataType::Null, - len: 0, - null_count: 0, - validity: None, - offset: 0, - buffer_offsets: Vec::new(), - child_data: Vec::new(), - } - } - - pub fn byte_array(data_len: usize) -> Self { - Self { - data_type: DataType::UInt8, - len: data_len, - null_count: 0, - validity: None, - offset: 0, - buffer_offsets: vec![BufferOffset { - offset: 0, - len: data_len, - }], - child_data: Vec::new(), - } - } - - pub unsafe fn from_array( - array: &ArrayData, - region_start: *const u8, - region_len: usize, - ) -> eyre::Result { - Ok(Self { - data_type: array.data_type().clone(), - len: array.len(), - null_count: array.null_count(), - validity: array.nulls().map(|b| b.validity().to_owned()), - offset: array.offset(), - buffer_offsets: array - .buffers() - .iter() - .map(|b| { - let ptr = b.as_ptr(); - if ptr as usize <= region_start as usize { - eyre::bail!("ptr {ptr:p} starts before region {region_start:p}"); - } - if ptr as usize >= region_start as usize + region_len { - eyre::bail!("ptr {ptr:p} starts after region {region_start:p}"); - } - if ptr as usize + b.len() > region_start as usize + region_len { - eyre::bail!("ptr {ptr:p} ends after region {region_start:p}"); - } - let offset = usize::try_from(unsafe { ptr.offset_from(region_start) }) - .context("offset_from is negative")?; - - Result::<_, eyre::Report>::Ok(BufferOffset { - offset, - len: b.len(), - }) - }) - .collect::>()?, - child_data: array - .child_data() - .iter() - .map(|c| unsafe { Self::from_array(c, region_start, region_len) }) - .collect::>()?, - }) - } -} - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub enum Parameter { Bool(bool), From d1500696b8f0ee28507a2fcd9e929fbaebcb0aa2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 14 Nov 2024 16:15:26 +0100 Subject: [PATCH 3/5] Removed commented-out dependency on `dora-core` --- libraries/message/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index eca897d0..20082c95 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -17,7 +17,6 @@ serde = { version = "1.0.136", features = ["derive"] } eyre = "0.6.8" arrow-schema = { workspace = true, features = ["serde"] } tokio = "1.39.2" -# dora-core = { workspace = true } uuid = { version = "1.7", features = ["serde", "v7"] } log = { version = "0.4.21", features = ["serde"] } aligned-vec = { version = "0.5.0", features = ["serde"] } From b669971c7ccc7f6b09dd420834435d931fce0ad9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 14 Nov 2024 16:17:49 +0100 Subject: [PATCH 4/5] Fix: Enable tokio sync feature Required when building just the `dora-message` crate --- libraries/message/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 20082c95..c0ee45fd 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -16,7 +16,7 @@ arrow-data = { workspace = true } serde = { version = "1.0.136", features = ["derive"] } eyre = "0.6.8" arrow-schema = { workspace = true, features = ["serde"] } -tokio = "1.39.2" +tokio = { version = "1.39.2", features = ["sync"] } uuid = { version = "1.7", features = ["serde", "v7"] } log = { version = "0.4.21", features = ["serde"] } aligned-vec = { version = "0.5.0", features = ["serde"] } From 7dc6b1d91d433c2f9b44b57522e9339152824ef5 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 14 Nov 2024 16:19:08 +0100 Subject: [PATCH 5/5] Remove `deny_unknown_fields` from `dora-message` type definitions To ensure forward compatiblity --- libraries/message/src/config.rs | 2 +- libraries/message/src/descriptor.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/libraries/message/src/config.rs b/libraries/message/src/config.rs index 69d8edc4..203a302f 100644 --- a/libraries/message/src/config.rs +++ b/libraries/message/src/config.rs @@ -36,7 +36,7 @@ pub struct NodeRunConfig { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields, from = "InputDef", into = "InputDef")] +#[serde(from = "InputDef", into = "InputDef")] pub struct Input { pub mapping: InputMapping, pub queue_size: Option, diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index f45e0f5f..a84963e3 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -150,11 +150,7 @@ pub enum OperatorSource { Wasm(String), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde( - deny_unknown_fields, - from = "PythonSourceDef", - into = "PythonSourceDef" -)] +#[serde(from = "PythonSourceDef", into = "PythonSourceDef")] pub struct PythonSource { pub source: String, pub conda_env: Option,