diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index e8d74c3a8a5..319bf0ae243 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -11,8 +11,9 @@ description = "Hydro's low-level dataflow runtime and IR" workspace = true [features] -default = [ "macros", "debugging" ] +default = [ "macros", "debugging", "meta" ] +meta = [ "dep:hydroflow_lang" ] macros = [ "hydroflow_macro", "hydroflow_datalog" ] hydroflow_macro = [ "dep:hydroflow_macro" ] hydroflow_datalog = [ "dep:hydroflow_datalog" ] @@ -35,7 +36,7 @@ bytes = "1.1.0" futures = "0.3.0" hydroflow_deploy_integration = { optional = true, path = "../hydro_deploy/hydroflow_deploy_integration", version = "^0.10.0" } hydroflow_datalog = { optional = true, path = "../hydroflow_datalog", version = "^0.10.0" } -hydroflow_lang = { path = "../hydroflow_lang", version = "^0.10.0" } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.10.0", optional = true } hydroflow_macro = { optional = true, path = "../hydroflow_macro", version = "^0.10.0" } itertools = "0.10.0" lattices = { path = "../lattices", version = "^0.5.8", features = [ "serde" ] } diff --git a/hydroflow/src/lib.rs b/hydroflow/src/lib.rs index effc596dc57..63c6c2f4836 100644 --- a/hydroflow/src/lib.rs +++ b/hydroflow/src/lib.rs @@ -20,12 +20,14 @@ pub mod compiled; pub mod scheduled; pub mod util; +#[cfg(feature = "meta")] +pub use hydroflow_lang as lang; #[cfg(feature = "python")] pub use pyo3; pub use variadics::{self, var_args, var_expr, var_type}; pub use { - bincode, bytes, futures, hydroflow_lang as lang, itertools, lattices, pusherator, rustc_hash, - serde, serde_json, tokio, tokio_stream, tokio_util, tracing, web_time, + bincode, bytes, futures, itertools, lattices, pusherator, rustc_hash, serde, serde_json, tokio, + tokio_stream, tokio_util, tracing, web_time, }; /// `#[macro_use]` automagically brings the declarative macro export to the crate-level. diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 8c36f3bf7fb..5a5b302e731 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -6,7 +6,9 @@ use std::cell::Cell; use std::future::Future; use std::marker::PhantomData; +#[cfg(feature = "meta")] use hydroflow_lang::diagnostic::{Diagnostic, SerdeSpan}; +#[cfg(feature = "meta")] use hydroflow_lang::graph::HydroflowGraph; use ref_cast::RefCast; use smallvec::SmallVec; @@ -31,8 +33,11 @@ pub struct Hydroflow<'a> { handoffs: Vec, + #[cfg(feature = "meta")] /// See [`Self::meta_graph()`]. meta_graph: Option, + + #[cfg(feature = "meta")] /// See [`Self::diagnostics()`]. diagnostics: Option>>, } @@ -131,29 +136,36 @@ impl<'a> Hydroflow<'a> { /// Assign the `HydroflowGraph` via JSON string. #[doc(hidden)] - pub fn __assign_meta_graph(&mut self, meta_graph_json: &str) { - let mut meta_graph: HydroflowGraph = - serde_json::from_str(meta_graph_json).expect("Failed to deserialize graph."); - - let mut op_inst_diagnostics = Vec::new(); - meta_graph.insert_node_op_insts_all(&mut op_inst_diagnostics); - assert!( - op_inst_diagnostics.is_empty(), - "Expected no diagnostics, got: {:#?}", - op_inst_diagnostics - ); + pub fn __assign_meta_graph(&mut self, _meta_graph_json: &str) { + #[cfg(feature = "meta")] + { + let mut meta_graph: HydroflowGraph = + serde_json::from_str(_meta_graph_json).expect("Failed to deserialize graph."); + + let mut op_inst_diagnostics = Vec::new(); + meta_graph.insert_node_op_insts_all(&mut op_inst_diagnostics); + assert!( + op_inst_diagnostics.is_empty(), + "Expected no diagnostics, got: {:#?}", + op_inst_diagnostics + ); - assert!(self.meta_graph.replace(meta_graph).is_none()); + assert!(self.meta_graph.replace(meta_graph).is_none()); + } } /// Assign the diagnostics via JSON string. #[doc(hidden)] - pub fn __assign_diagnostics(&mut self, diagnostics_json: &'static str) { - let diagnostics: Vec> = - serde_json::from_str(diagnostics_json).expect("Failed to deserialize diagnostics."); + pub fn __assign_diagnostics(&mut self, _diagnostics_json: &'static str) { + #[cfg(feature = "meta")] + { + let diagnostics: Vec> = serde_json::from_str(_diagnostics_json) + .expect("Failed to deserialize diagnostics."); - assert!(self.diagnostics.replace(diagnostics).is_none()); + assert!(self.diagnostics.replace(diagnostics).is_none()); + } } + #[cfg(feature = "meta")] /// Return a handle to the meta `HydroflowGraph` if set. The `HydroflowGraph is a /// representation of all the operators, subgraphs, and handoffs in this `Hydroflow` instance. /// Will only be set if this graph was constructed using a surface syntax macro. @@ -161,6 +173,7 @@ impl<'a> Hydroflow<'a> { self.meta_graph.as_ref() } + #[cfg(feature = "meta")] /// Returns any diagnostics generated by the surface syntax macro. Each diagnostic is a pair of /// (1) a `Diagnostic` with span info reset and (2) the `ToString` version of the diagnostic /// with original span info. diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml index d209f43f81a..f6e42bb7d53 100644 --- a/hydroflow_plus/Cargo.toml +++ b/hydroflow_plus/Cargo.toml @@ -14,27 +14,27 @@ workspace = true path = "src/lib.rs" [features] -default = ["deploy_runtime"] +default = [] stageleft_devel = [] -deploy_runtime = [ "hydroflow/deploy_integration" ] -deploy = [ "deploy_runtime", "dep:hydro_deploy", "dep:trybuild-internals-api", "dep:toml", "dep:prettyplease" ] +deploy = [ "build", "dep:hydro_deploy", "dep:trybuild-internals-api", "dep:toml", "dep:prettyplease", "dep:sha2", "dep:stageleft_tool", "dep:nameof" ] +build = [ "dep:hydroflow_lang" ] [dependencies] bincode = "1.3.1" hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0", optional = true } -hydroflow = { path = "../hydroflow", version = "^0.10.0", default-features = false } -hydroflow_lang = { path = "../hydroflow_lang", version = "^0.10.0" } +hydroflow = { path = "../hydroflow", version = "^0.10.0", default-features = false, features = ["deploy_integration"] } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.10.0", optional = true } match_box = "0.0.2" -nameof = "1.0.0" +nameof = { version = "1.0.0", optional = true } prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true } proc-macro-crate = "1.0.0" proc-macro2 = "1.0.74" quote = "1.0.35" sealed = "0.6.0" serde = { version = "1.0.197", features = [ "derive" ] } -sha2 = "0.10.0" +sha2 = { version = "0.10.0", optional = true } stageleft = { path = "../stageleft", version = "^0.5.0" } -stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } +stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0", optional = true } syn = { version = "2.0.46", features = [ "parsing", "extra-traits", "visit-mut" ] } tokio = { version = "1.29.0", features = [ "full" ] } toml = { version = "0.8.0", optional = true } diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index dbfe070ede7..5bda46deab7 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -3,17 +3,23 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::rc::Rc; +#[cfg(feature = "build")] use compiled::CompiledFlow; +#[cfg(feature = "build")] use deploy::{DeployFlow, DeployResult}; use stageleft::*; +#[cfg(feature = "build")] use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; use crate::staging_util::Invariant; +#[cfg(feature = "build")] pub mod built; +#[cfg(feature = "build")] pub mod compiled; +#[cfg(feature = "build")] pub mod deploy; pub struct FlowStateInner { @@ -89,6 +95,7 @@ impl<'a> FlowBuilder<'a> { } } + #[cfg(feature = "build")] pub fn finalize(mut self) -> built::BuiltFlow<'a> { self.finalized = true; @@ -101,10 +108,12 @@ impl<'a> FlowBuilder<'a> { } } + #[cfg(feature = "build")] pub fn with_default_optimize>(self) -> DeployFlow<'a, D> { self.finalize().with_default_optimize() } + #[cfg(feature = "build")] pub fn optimize_with( self, f: impl FnOnce(Vec) -> Vec, @@ -158,6 +167,7 @@ impl<'a> FlowBuilder<'a> { } } + #[cfg(feature = "build")] pub fn with_process>( self, process: &Process

, @@ -166,6 +176,7 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_process(process, spec) } + #[cfg(feature = "build")] pub fn with_external>( self, process: &ExternalProcess

, @@ -174,6 +185,7 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_external(process, spec) } + #[cfg(feature = "build")] pub fn with_cluster>( self, cluster: &Cluster, @@ -182,14 +194,17 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_cluster(cluster, spec) } + #[cfg(feature = "build")] pub fn compile>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> { self.with_default_optimize::().compile(env) } + #[cfg(feature = "build")] pub fn compile_no_network>(self) -> CompiledFlow<'a, D::GraphId> { self.with_default_optimize::().compile_no_network() } + #[cfg(feature = "build")] pub fn deploy>( self, env: &mut D::InstantiateEnv, diff --git a/hydroflow_plus/src/deploy/deploy_graph.rs b/hydroflow_plus/src/deploy/deploy_graph.rs index 15cb6f1aaf9..a69332167d5 100644 --- a/hydroflow_plus/src/deploy/deploy_graph.rs +++ b/hydroflow_plus/src/deploy/deploy_graph.rs @@ -15,17 +15,17 @@ use hydro_deploy::hydroflow_crate::HydroflowCrateService; use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate}; use hydroflow::bytes::Bytes; use hydroflow::futures::{Sink, SinkExt, Stream, StreamExt}; -use hydroflow::lang::graph::HydroflowGraph; use hydroflow::util::deploy::{ConnectedSink, ConnectedSource}; +use hydroflow_lang::graph::HydroflowGraph; use nameof::name_of; use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::{QuotedWithContext, RuntimeData}; use tokio::sync::RwLock; -use super::deploy_runtime::*; use super::trybuild::create_graph_trybuild; use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort}; +use crate::deploy_runtime::*; pub struct HydroDeploy {} diff --git a/hydroflow_plus/src/deploy/macro_runtime.rs b/hydroflow_plus/src/deploy/macro_runtime.rs index 50eb298d971..a61b417b9da 100644 --- a/hydroflow_plus/src/deploy/macro_runtime.rs +++ b/hydroflow_plus/src/deploy/macro_runtime.rs @@ -9,8 +9,8 @@ use hydroflow::util::deploy::DeployPorts; use hydroflow_lang::graph::HydroflowGraph; use stageleft::{QuotedWithContext, RuntimeData}; -use super::HydroflowPlusMeta; use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; +use crate::deploy_runtime::HydroflowPlusMeta; pub struct DeployRuntime {} @@ -35,7 +35,7 @@ impl<'a> Deploy<'a> for DeployRuntime { } } - fn trivail_cluster(_id: usize) -> Self::Cluster { + fn trivial_cluster(_id: usize) -> Self::Cluster { DeployRuntimeCluster { next_port: Rc::new(RefCell::new(0)), } @@ -60,7 +60,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _p2: &Self::Process, p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - super::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str()) + crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str()) } fn o2o_connect( @@ -79,7 +79,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _c2: &Self::Cluster, c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - super::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str()) + crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str()) } fn o2m_connect( @@ -98,7 +98,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _p2: &Self::Process, p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - super::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str()) + crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str()) } fn m2o_connect( @@ -117,7 +117,7 @@ impl<'a> Deploy<'a> for DeployRuntime { _c2: &Self::Cluster, c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - super::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str()) + crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str()) } fn m2m_connect( @@ -171,11 +171,11 @@ impl<'a> Deploy<'a> for DeployRuntime { env: &Self::CompileEnv, of_cluster: usize, ) -> impl QuotedWithContext<'a, &'a Vec, ()> + Copy + 'a { - super::deploy_runtime::cluster_members(*env, of_cluster) + crate::deploy_runtime::cluster_members(*env, of_cluster) } fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a { - super::deploy_runtime::cluster_self_id(*env) + crate::deploy_runtime::cluster_self_id(*env) } } diff --git a/hydroflow_plus/src/deploy/mod.rs b/hydroflow_plus/src/deploy/mod.rs index 8eee87ff035..028524ba088 100644 --- a/hydroflow_plus/src/deploy/mod.rs +++ b/hydroflow_plus/src/deploy/mod.rs @@ -9,8 +9,8 @@ use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::QuotedWithContext; -#[cfg(feature = "deploy_runtime")] pub mod macro_runtime; +pub use macro_runtime::*; #[cfg(feature = "deploy")] pub(crate) mod trybuild; @@ -20,15 +20,9 @@ pub(crate) mod trybuild; #[doc(hidden)] pub mod trybuild_rewriters; -pub use macro_runtime::*; #[cfg(feature = "deploy")] pub use trybuild::init_test; -#[cfg(feature = "deploy_runtime")] -pub mod deploy_runtime; -#[cfg(feature = "deploy_runtime")] -pub use deploy_runtime::HydroflowPlusMeta; - #[cfg(feature = "deploy")] pub mod deploy_graph; @@ -81,7 +75,7 @@ pub trait Deploy<'a> { panic!("No trivial process") } - fn trivail_cluster(_id: usize) -> Self::Cluster { + fn trivial_cluster(_id: usize) -> Self::Cluster { panic!("No trivial cluster") } @@ -205,7 +199,7 @@ impl< } fn trivial_cluster(id: usize) -> Self::Cluster { - >::trivail_cluster(id) + >::trivial_cluster(id) } } diff --git a/hydroflow_plus/src/deploy/trybuild.rs b/hydroflow_plus/src/deploy/trybuild.rs index 94fe61fbead..85bda16b4de 100644 --- a/hydroflow_plus/src/deploy/trybuild.rs +++ b/hydroflow_plus/src/deploy/trybuild.rs @@ -104,7 +104,7 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec use hydroflow_plus::*; #[allow(unused)] - fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::hydroflow::util::deploy::DeployPorts) -> hydroflow_plus::hydroflow::scheduled::graph::Hydroflow<'a> { + fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::hydroflow::util::deploy::DeployPorts) -> hydroflow_plus::hydroflow::scheduled::graph::Hydroflow<'a> { #(#extra_stmts)* #tokens } diff --git a/hydroflow_plus/src/deploy/deploy_runtime.rs b/hydroflow_plus/src/deploy_runtime.rs similarity index 100% rename from hydroflow_plus/src/deploy/deploy_runtime.rs rename to hydroflow_plus/src/deploy_runtime.rs diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 567bdc34ed5..6534850a7ce 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -1,16 +1,22 @@ use core::panic; use std::cell::RefCell; -use std::collections::{BTreeMap, HashMap}; +#[cfg(feature = "build")] +use std::collections::BTreeMap; +use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; use std::rc::Rc; +#[cfg(feature = "build")] use hydroflow_lang::graph::FlatGraphBuilder; -use hydroflow_lang::parse::Pipeline; -use proc_macro2::{Span, TokenStream}; +#[cfg(feature = "build")] +use proc_macro2::Span; +use proc_macro2::TokenStream; use quote::ToTokens; +#[cfg(feature = "build")] use syn::parse_quote; +#[cfg(feature = "build")] use crate::deploy::{Deploy, RegisterPort}; use crate::location::LocationId; @@ -54,15 +60,6 @@ impl Debug for DebugInstantiate { } } -#[derive(Clone)] -pub struct DebugPipelineFn(pub Rc Pipeline + 'static>); - -impl Debug for DebugPipelineFn { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "") - } -} - /// A source in a Hydroflow+ graph, where data enters the graph. #[derive(Debug)] pub enum HfPlusSource { @@ -93,6 +90,7 @@ pub enum HfPlusLeaf { } impl HfPlusLeaf { + #[cfg(feature = "build")] pub fn compile_network<'a, D: Deploy<'a>>( self, compile_env: &D::CompileEnv, @@ -147,6 +145,7 @@ impl HfPlusLeaf { } } + #[cfg(feature = "build")] pub fn emit( &self, graph_builders: &mut BTreeMap, @@ -348,9 +347,9 @@ pub enum HfPlusNode { from_key: Option, to_location: LocationId, to_key: Option, - serialize_pipeline: Option, + serialize_fn: Option, instantiate_fn: DebugInstantiate, - deserialize_pipeline: Option, + deserialize_fn: Option, input: Box, }, } @@ -358,6 +357,7 @@ pub enum HfPlusNode { pub type SeenTees = HashMap<*const RefCell, Rc>>; impl<'a> HfPlusNode { + #[cfg(feature = "build")] pub fn compile_network>( &mut self, compile_env: &D::CompileEnv, @@ -534,6 +534,7 @@ impl<'a> HfPlusNode { } } + #[cfg(feature = "build")] pub fn emit( &self, graph_builders: &mut BTreeMap, @@ -1141,9 +1142,9 @@ impl<'a> HfPlusNode { from_key: _, to_location, to_key: _, - serialize_pipeline, + serialize_fn: serialize_pipeline, instantiate_fn, - deserialize_pipeline, + deserialize_fn: deserialize_pipeline, input, } => { let (sink_expr, source_expr, _connect_fn) = match instantiate_fn { @@ -1163,7 +1164,7 @@ impl<'a> HfPlusNode { if let Some(serialize_pipeline) = serialize_pipeline { sender_builder.add_statement(parse_quote! { - #input_ident -> #serialize_pipeline -> dest_sink(#sink_expr); + #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr); }); } else { sender_builder.add_statement(parse_quote! { @@ -1187,7 +1188,7 @@ impl<'a> HfPlusNode { if let Some(deserialize_pipeline) = deserialize_pipeline { receiver_builder.add_statement(parse_quote! { - #receiver_stream_ident = source_stream(#source_expr) -> #deserialize_pipeline; + #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline); }); } else { receiver_builder.add_statement(parse_quote! { @@ -1201,6 +1202,7 @@ impl<'a> HfPlusNode { } } +#[cfg(feature = "build")] #[expect(clippy::too_many_arguments, reason = "networking internals")] fn instantiate_network<'a, D: Deploy<'a>>( from_location: &mut LocationId, diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index 27ac6e51b4a..9512e4389cf 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -27,8 +27,11 @@ pub mod location; pub use location::cluster::CLUSTER_SELF_ID; pub use location::{Cluster, ClusterId, ExternalProcess, Location, Process, Tick, Timestamped}; +#[cfg(feature = "build")] pub mod deploy; +pub mod deploy_runtime; + pub mod cycle; pub mod builder; diff --git a/hydroflow_plus/src/location/external_process.rs b/hydroflow_plus/src/location/external_process.rs index 103aabef869..26a559001d3 100644 --- a/hydroflow_plus/src/location/external_process.rs +++ b/hydroflow_plus/src/location/external_process.rs @@ -11,18 +11,42 @@ use crate::staging_util::Invariant; use crate::{Stream, Unbounded}; pub struct ExternalBytesPort { + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) process_id: usize, + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) port_id: usize, } pub struct ExternalBincodeSink { + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) process_id: usize, + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) port_id: usize, pub(crate) _phantom: PhantomData, } pub struct ExternalBincodeStream { + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) process_id: usize, + #[cfg_attr( + not(feature = "build"), + expect(unused, reason = "unused without feature") + )] pub(crate) port_id: usize, pub(crate) _phantom: PhantomData, } @@ -77,6 +101,8 @@ impl<'a, P> ExternalProcess<'a, P> { id }; + let deser_expr: syn::Expr = syn::parse_quote!(|b| b.unwrap().freeze()); + ( ExternalBytesPort { process_id: self.id, @@ -89,9 +115,9 @@ impl<'a, P> ExternalProcess<'a, P> { from_key: Some(next_external_port_id), to_location: to.id(), to_key: None, - serialize_pipeline: None, + serialize_fn: None, instantiate_fn: crate::ir::DebugInstantiate::Building(), - deserialize_pipeline: Some(syn::parse_quote!(map(|b| b.unwrap().freeze()))), + deserialize_fn: Some(deser_expr.into()), input: Box::new(HfPlusNode::Source { source: HfPlusSource::ExternalNetwork(), location_kind: LocationId::ExternalProcess(self.id), @@ -125,9 +151,9 @@ impl<'a, P> ExternalProcess<'a, P> { from_key: Some(next_external_port_id), to_location: to.id(), to_key: None, - serialize_pipeline: None, + serialize_fn: None, instantiate_fn: crate::ir::DebugInstantiate::Building(), - deserialize_pipeline: Some(crate::stream::deserialize_bincode::(None)), + deserialize_fn: Some(crate::stream::deserialize_bincode::(None).into()), input: Box::new(HfPlusNode::Source { source: HfPlusSource::ExternalNetwork(), location_kind: LocationId::ExternalProcess(self.id), diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index 2924bf54484..d15784ed458 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -73,9 +73,9 @@ fn persist_pullup_node( from_key, to_location, to_key, - serialize_pipeline, + serialize_fn, instantiate_fn, - deserialize_pipeline, + deserialize_fn, input: mb!(* HfPlusNode::Persist(behind_persist)), .. } => HfPlusNode::Persist(Box::new(HfPlusNode::Network { @@ -83,9 +83,9 @@ fn persist_pullup_node( from_key, to_location, to_key, - serialize_pipeline, + serialize_fn, instantiate_fn, - deserialize_pipeline, + deserialize_fn, input: behind_persist, })), diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 06db898941a..163654d1a9e 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -6,7 +6,6 @@ use std::rc::Rc; use hydroflow::bytes::Bytes; use hydroflow::futures; -use hydroflow_lang::parse::Pipeline; use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::{q, IntoQuotedMut, QuotedWithContext}; @@ -923,43 +922,43 @@ impl<'a, T, L: Location<'a>, Order> Stream, Bounded, Order> { } } -fn serialize_bincode(is_demux: bool) -> Pipeline { +fn serialize_bincode(is_demux: bool) -> syn::Expr { let root = get_this_crate(); let t_type: syn::Type = stageleft::quote_type::(); if is_demux { parse_quote! { - map(|(id, data): (#root::ClusterId<_>, #t_type)| { + |(id, data): (#root::ClusterId<_>, #t_type)| { (id.raw_id, #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into()) - }) + } } } else { parse_quote! { - map(|data| { + |data| { #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into() - }) + } } } } -pub(super) fn deserialize_bincode(tagged: Option) -> Pipeline { +pub(super) fn deserialize_bincode(tagged: Option) -> syn::Expr { let root = get_this_crate(); let t_type: syn::Type = stageleft::quote_type::(); if let Some(c_type) = tagged { parse_quote! { - map(|res| { + |res| { let (id, b) = res.unwrap(); (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap()) - }) + } } } else { parse_quote! { - map(|res| { + |res| { #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap() - }) + } } } } @@ -1027,9 +1026,9 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { from_key: None, to_location: other.id(), to_key: None, - serialize_pipeline, + serialize_fn: serialize_pipeline.map(|e| e.into()), instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline, + deserialize_fn: deserialize_pipeline.map(|e| e.into()), input: Box::new(self.ir_node.into_inner()), }, ) @@ -1062,9 +1061,9 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { from_key: None, to_location: other.id(), to_key: Some(external_key), - serialize_pipeline, + serialize_fn: serialize_pipeline.map(|e| e.into()), instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: None, + deserialize_fn: None, input: Box::new(self.ir_node.into_inner()), }), }); @@ -1092,14 +1091,14 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { from_key: None, to_location: other.id(), to_key: None, - serialize_pipeline: None, + serialize_fn: None, instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: if let Some(c_type) = L::Root::tagged_type() { - Some( - parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))), - ) + deserialize_fn: if let Some(c_type) = L::Root::tagged_type() { + let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze())); + Some(expr.into()) } else { - Some(parse_quote!(map(|b| b.unwrap().freeze()))) + let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze()); + Some(expr.into()) }, input: Box::new(self.ir_node.into_inner()), }, @@ -1125,9 +1124,9 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { from_key: None, to_location: other.id(), to_key: Some(external_key), - serialize_pipeline: None, + serialize_fn: None, instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: None, + deserialize_fn: None, input: Box::new(self.ir_node.into_inner()), }), }); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap index 751772daad6..96f896cfb01 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap @@ -22,26 +22,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (u64 , u64) > (& data) . unwrap () . into () }", - ], - }, - ), + serialize_fn: Some( + | data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (u64 , u64) > (& data) . unwrap () . into () }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: compute_pi :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (u64 , u64) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: compute_pi :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (u64 , u64) > (& b) . unwrap ()) }, ), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | | (0u64 , 0u64) }), diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap index 6e5ae900310..94f1f5ad6f0 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap @@ -9,7 +9,7 @@ expression: ir.surface_syntax_string() 5v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (f64 , f64) , bool > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (x , y) | x * x + y * y < 1.0 })); 6v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | | (0u64 , 0u64) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , bool , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , sample_inside | { if sample_inside { * inside += 1 ; } * total += 1 ; } })); 7v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (u64 , u64) > (& data) . unwrap () . into () }); -8v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let c1_port__free = "port_0" ; let env__free = FAKE ; { env__free . port (c1_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); +8v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let c1_port__free = "port_0" ; let env__free = FAKE ; { env__free . port (c1_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); 1v1 -> 2v1; 2v1 -> 3v1; diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap index d884f0ba97c..a8db79e7fbf 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap @@ -2,7 +2,7 @@ source: hydroflow_plus_test/src/cluster/compute_pi.rs expression: ir.surface_syntax_string() --- -1v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let env__free = FAKE ; let p2_port__free = "port_0" ; { env__free . port (p2_port__free) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); +1v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let env__free = FAKE ; let p2_port__free = "port_0" ; { env__free . port (p2_port__free) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); 2v1 = map (| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: compute_pi :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (u64 , u64) > (& b) . unwrap ()) }); 3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: compute_pi :: Worker > , (u64 , u64)) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b })); 4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } })); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap index 6c202ced1d1..9df002b2a16 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap @@ -14,26 +14,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < () > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < () > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < () > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap index 2f1d441de9d..f330a4745b5 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap @@ -19,26 +19,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (std :: string :: String , i32) > (& data) . unwrap () . into () }", - ], - }, - ), + serialize_fn: Some( + | data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (std :: string :: String , i32) > (& data) . unwrap () . into () }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: map_reduce :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (std :: string :: String , i32) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: map_reduce :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (std :: string :: String , i32) > (& b) . unwrap ()) }, ), input: Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | (string , count) | println ! ("partition count: {} - {}" , string , count) }), @@ -56,26 +42,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , std :: string :: String) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , std :: string :: String) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }", - ], - }, - ), + deserialize_fn: Some( + | res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) }), diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap index 417b0bc0ff7..00787247c6a 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_0.snap @@ -7,8 +7,8 @@ expression: ir.surface_syntax_string() 3v1 = enumerate :: < 'static > (); 4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (usize , std :: string :: String) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , std :: string :: String) > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > > > (__hydroflow_plus_cluster_ids_1) } ; | (i , w) | (ids__free [i % ids__free . len ()] , w) })); 5v1 = map (| (id , data) : (hydroflow_plus :: ClusterId < _ > , std :: string :: String) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < std :: string :: String > (& data) . unwrap () . into ()) }); -6v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let env__free = FAKE ; let p1_port__free = "port_0" ; { env__free . port (p1_port__free) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } }); -7v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let env__free = FAKE ; let p2_port__free = "port_1" ; { env__free . port (p2_port__free) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); +6v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let env__free = FAKE ; let p1_port__free = "port_0" ; { env__free . port (p1_port__free) . connect_local_blocking :: < ConnectedDemux < ConnectedDirect > > () . into_sink () } }); +7v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let env__free = FAKE ; let p2_port__free = "port_1" ; { env__free . port (p2_port__free) . connect_local_blocking :: < ConnectedTagged < ConnectedDirect > > () . into_source () } }); 8v1 = map (| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: map_reduce :: Worker > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (std :: string :: String , i32) > (& b) . unwrap ()) }); 9v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: map_reduce :: Worker > , (std :: string :: String , i32)) , (std :: string :: String , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b })); 10v1 = reduce_keyed :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | total , count | * total += count })); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap index 4024ed117c8..5254228e054 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir@surface_graph_1.snap @@ -2,13 +2,13 @@ source: hydroflow_plus_test/src/cluster/map_reduce.rs expression: ir.surface_syntax_string() --- -1v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let c2_port__free = "port_0" ; let env__free = FAKE ; { env__free . port (c2_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_source () } }); +1v1 = source_stream ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let c2_port__free = "port_0" ; let env__free = FAKE ; { env__free . port (c2_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_source () } }); 2v1 = map (| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }); 3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < std :: string :: String , (std :: string :: String , ()) > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | string | (string , ()) })); 4v1 = fold_keyed :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | | 0 }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , () , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | count , _ | * count += 1 })); 5v1 = inspect (stageleft :: runtime_support :: fn1_borrow_type_hint :: < (std :: string :: String , i32) , () > ({ use crate :: __staged :: cluster :: map_reduce :: * ; | (string , count) | println ! ("partition count: {} - {}" , string , count) })); 6v1 = map (| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (std :: string :: String , i32) > (& data) . unwrap () . into () }); -7v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy :: deploy_runtime :: * ; let c1_port__free = "port_1" ; let env__free = FAKE ; { env__free . port (c1_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); +7v1 = dest_sink ({ use hydroflow_plus :: __staged :: deploy_runtime :: * ; let c1_port__free = "port_1" ; let env__free = FAKE ; { env__free . port (c1_port__free) . connect_local_blocking :: < ConnectedDirect > () . into_sink () } }); 1v1 -> 2v1; 2v1 -> 3v1; diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index c2c3c9e7119..8fdc53efeaa 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -134,26 +134,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -252,26 +238,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((ballot , max_ballot) , log) | (ballot . proposer_id , (ballot , if ballot == max_ballot { Ok (log) } else { Err (max_ballot) })) }), @@ -289,26 +261,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -597,26 +555,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& b) . unwrap ()) }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let CLUSTER_SELF_ID__free = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((key , value) , leader_id) | (leader_id , KvPayload { key , value : (CLUSTER_SELF_ID__free , value) }) }), @@ -775,26 +719,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > (& b) . unwrap ()) }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , ((p2a . slot , p2a . ballot) , if p2a . ballot == max_ballot { Ok (()) } else { Err (max_ballot) })) }), @@ -811,26 +741,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -1059,26 +975,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , usize) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , usize) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -1166,26 +1068,12 @@ expression: built.ir() 3, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -1391,26 +1279,12 @@ expression: built.ir() 2, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }, ), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids__free = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids__free . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), @@ -1489,26 +1363,12 @@ expression: built.ir() 2, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ((u32 , u32) , core :: result :: Result < () , () >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < ((u32 , u32) , core :: result :: Result < () , () >) > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , ((u32 , u32) , core :: result :: Result < () , () >)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < ((u32 , u32) , core :: result :: Result < () , () >) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < ((u32 , u32) , core :: result :: Result < () , () >) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < ((u32 , u32) , core :: result :: Result < () , () >) > (& b) . unwrap ()) }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , ((u32 , u32) , core :: result :: Result < () , () >)) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . value . 0 , ((payload . key , payload . value . 1) , Ok (()))) }), diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap index e7083d8ccf6..b19ad045728 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap @@ -14,26 +14,12 @@ expression: built.ir() 0, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& data) . unwrap () . into () }", - ], - }, - ), + serialize_fn: Some( + | data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& data) . unwrap () . into () }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < () > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& b) . unwrap ()) }", - ], - }, - ), + deserialize_fn: Some( + | res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < () > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& b) . unwrap ()) }, ), input: Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) , () > ({ use crate :: __staged :: cluster :: simple_cluster :: * ; let CLUSTER_SELF_ID__free = hydroflow_plus :: ClusterId :: < () > :: from_raw (__hydroflow_plus_cluster_self_id_1) ; move | n | println ! ("cluster received: {:?} (self cluster id: {})" , n , CLUSTER_SELF_ID__free) }), @@ -46,26 +32,12 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& data) . unwrap () . into ()) }", - ], - }, - ), + serialize_fn: Some( + | (id , data) : (hydroflow_plus :: ClusterId < _ > , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32)) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& data) . unwrap () . into ()) }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& res . unwrap ()) . unwrap () }", - ], - }, - ), + deserialize_fn: Some( + | res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) > (& res . unwrap ()) . unwrap () }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32) , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < () > , i32)) > ({ use crate :: __staged :: cluster :: simple_cluster :: * ; | (id , n) | (id , (id , n)) }), diff --git a/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap b/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap index fb73641059a..19c5d8d2806 100644 --- a/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap +++ b/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap @@ -16,17 +16,10 @@ expression: built.ir() 1, ), to_key: None, - serialize_pipeline: None, + serialize_fn: None, instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }", - ], - }, - ), + deserialize_fn: Some( + | res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }, ), input: Source { source: ExternalNetwork, @@ -47,26 +40,12 @@ expression: built.ir() 2, ), to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork > (& data) . unwrap () . into () }", - ], - }, - ), + serialize_fn: Some( + | data | { hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork > (& data) . unwrap () . into () }, ), instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork > (& res . unwrap ()) . unwrap () }", - ], - }, - ), + deserialize_fn: Some( + | res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork > (& res . unwrap ()) . unwrap () }, ), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork > ({ use crate :: __staged :: distributed :: first_ten :: * ; | n | SendOverNetwork { n } }), diff --git a/hydroflow_plus_test_local/Cargo.toml b/hydroflow_plus_test_local/Cargo.toml index b8f301a0ce3..60307f1002c 100644 --- a/hydroflow_plus_test_local/Cargo.toml +++ b/hydroflow_plus_test_local/Cargo.toml @@ -12,7 +12,7 @@ stageleft_devel = [] [dependencies] hydroflow = { path = "../hydroflow", version = "^0.10.0", default-features = false } # , features = ["debugging"] } -hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0", features = ["build"] } stageleft = { path = "../stageleft", version = "^0.5.0" } rand = "0.8.0" diff --git a/hydroflow_plus_test_local_macro/Cargo.toml b/hydroflow_plus_test_local_macro/Cargo.toml index 57da63e3239..693f44dcf31 100644 --- a/hydroflow_plus_test_local_macro/Cargo.toml +++ b/hydroflow_plus_test_local_macro/Cargo.toml @@ -12,7 +12,7 @@ proc-macro = true path = "../hydroflow_plus_test_local/src/lib.rs" [dependencies] -hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0", features = ["build"] } stageleft = { path = "../stageleft", version = "^0.5.0" } rand = "0.8.0"