diff --git a/apis/rust/node/src/daemon/mod.rs b/apis/rust/node/src/daemon/mod.rs index 2f1616aa8..270118fe3 100644 --- a/apis/rust/node/src/daemon/mod.rs +++ b/apis/rust/node/src/daemon/mod.rs @@ -145,16 +145,18 @@ impl ControlChannel { } } - pub fn send_empty_message( + pub fn send_message( &mut self, output_id: DataId, metadata: Metadata<'static>, + data: Vec, ) -> eyre::Result<()> { let reply = self .channel - .request(&DaemonRequest::SendEmptyMessage { + .request(&DaemonRequest::SendMessage { output_id, metadata, + data, }) .wrap_err("failed to send SendEmptyMessage request to dora-daemon")?; match reply { @@ -275,7 +277,7 @@ impl EventStream { let drop_token = match &event { NodeEvent::Input { data: Some(data), .. - } => Some(data.drop_token.clone()), + } => data.drop_token(), NodeEvent::Stop | NodeEvent::InputClosed { .. } | NodeEvent::Input { data: None, .. } => None, @@ -350,18 +352,21 @@ impl EventStream { NodeEvent::Stop => Event::Stop, NodeEvent::InputClosed { id } => Event::InputClosed { id }, NodeEvent::Input { id, metadata, data } => { - let mapped = data - .map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) }) + let data = data + .map(|data| match data { + dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)), + dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe { + MappedInputData::map(&d.shared_memory_id, d.len).map(|data| { + Data::SharedMemory { + data, + _drop: ack_channel, + } + }) + }, + }) .transpose(); - match mapped { - Ok(mapped) => Event::Input { - id, - metadata, - data: mapped.map(|data| Data { - data, - _drop: ack_channel, - }), - }, + match data { + Ok(data) => Event::Input { id, metadata, data }, Err(err) => Event::Error(format!("{err:?}")), } } @@ -394,16 +399,22 @@ pub enum Event<'a> { Error(String), } -pub struct Data<'a> { - data: MappedInputData<'a>, - _drop: std::sync::mpsc::Sender<()>, +pub enum Data<'a> { + Vec(Vec), + SharedMemory { + data: MappedInputData<'a>, + _drop: std::sync::mpsc::Sender<()>, + }, } impl std::ops::Deref for Data<'_> { type Target = [u8]; fn deref(&self) -> &Self::Target { - &self.data + match self { + Data::SharedMemory { data, .. } => data, + Data::Vec(data) => data, + } } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 4304215cd..0da003aaa 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf; mod daemon; +const ZERO_COPY_THRESHOLD: usize = 4096; + pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, @@ -70,7 +72,7 @@ impl DoraNode { } let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); - if data_len > 0 { + if data_len >= ZERO_COPY_THRESHOLD { let sample = self .control_channel .prepare_message(output_id.clone(), metadata, data_len) @@ -88,9 +90,10 @@ impl DoraNode { .send_prepared_message(sample) .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; } else { - data(&mut []); + let mut buffer = vec![0; data_len]; + data(&mut buffer); self.control_channel - .send_empty_message(output_id.clone(), metadata) + .send_message(output_id.clone(), metadata, buffer) .wrap_err_with(|| format!("failed to send output {output_id}"))?; } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 81bc96998..faf6dd323 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -687,20 +687,35 @@ impl Daemon { let mut drop_tokens = Vec::new(); for (receiver_id, input_id) in local_receivers { if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { - let drop_token = DropToken::generate(); - let send_result = channel.send_async(daemon_messages::NodeEvent::Input { + let mut drop_token = None; + let item = daemon_messages::NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), - data: data.as_ref().map(|data| daemon_messages::InputData { - shared_memory_id: data.get_os_id().to_owned(), - len: data.len(), - drop_token: drop_token.clone(), - }), - }); + data: match &data { + Data::None => None, + Data::SharedMemory(data) => { + let token = DropToken::generate(); + drop_token = Some(token); + Some(daemon_messages::InputData::SharedMemory( + daemon_messages::SharedMemoryInput { + shared_memory_id: data.get_os_id().to_owned(), + len: data.len(), + drop_token: token, + }, + )) + } + Data::Vec(data) => { + Some(daemon_messages::InputData::Vec(data.clone())) + } + }, + }; + let send_result = channel.send_async(item); match timeout(Duration::from_millis(10), send_result).await { Ok(Ok(())) => { - drop_tokens.push(drop_token); + if let Some(token) = drop_token { + drop_tokens.push(token); + } } Ok(Err(_)) => { closed.push(receiver_id); @@ -716,25 +731,32 @@ impl Daemon { for id in closed { dataflow.subscribe_channels.remove(id); } - let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned()); - - // report drop tokens to shared memory handler - if let Some(data) = data { - if let Err(err) = self - .shared_memory_handler - .send_async(shared_mem_handler::DaemonEvent::SentOut { - data: *data, - drop_tokens, - }) - .await - .wrap_err("shared mem handler crashed after send out") - { - tracing::error!("{err:?}"); + + let data_bytes = match data { + Data::SharedMemory(data) => { + let bytes = unsafe { data.as_slice() }.to_owned(); + + // report drop tokens to shared memory handler + let send_result = self + .shared_memory_handler + .send_async(shared_mem_handler::DaemonEvent::SentOut { + data: *data, + drop_tokens, + }) + .await; + if let Err(err) = + send_result.wrap_err("shared mem handler crashed after send out") + { + tracing::error!("{err:?}"); + } + + bytes } - } + Data::Vec(data) => data, + Data::None => Vec::new(), + }; // TODO send `data` via network to all remove receivers - if let Some(data) = data_bytes {} } ShmemHandlerEvent::HandlerError(err) => { bail!(err.wrap_err("shared memory handler failed")) @@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent { node_id: NodeId, output_id: DataId, metadata: dora_core::message::Metadata<'static>, - data: Option>, + data: Data, }, HandlerError(eyre::ErrReport), } @@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent { .field("node_id", node_id) .field("output_id", output_id) .field("metadata", metadata) - .field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None")) + .field( + "data", + match &data { + Data::None => &"None", + Data::SharedMemory(_) => &"SharedMemory(..)", + Data::Vec(_) => &"Vec(..)", + }, + ) .finish(), ShmemHandlerEvent::HandlerError(err) => { f.debug_tuple("HandlerError").field(err).finish() @@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent { } } +pub enum Data { + None, + SharedMemory(Box), + Vec(Vec), +} + +impl From>> for Data { + fn from(data: Option>) -> Self { + match data { + Some(data) => Self::SharedMemory(data), + None => Self::None, + } + } +} + +impl From> for Data { + fn from(data: Vec) -> Self { + Self::Vec(data) + } +} + #[derive(Debug)] pub enum DoraEvent { Timer { diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 3bec44ef6..58345446c 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -241,7 +241,9 @@ where data: Some(data), .. }) = self.queue.remove(index) { - drop_tokens.push(data.drop_token); + if let Some(drop_token) = data.drop_token() { + drop_tokens.push(drop_token); + } } } self.report_drop_tokens(drop_tokens).await?; @@ -296,9 +298,10 @@ where ) .await?; } - DaemonRequest::SendEmptyMessage { + DaemonRequest::SendMessage { output_id, metadata, + data, } => { // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); @@ -307,7 +310,7 @@ where node_id: self.node_id.clone(), output_id, metadata, - data: None, + data: data.into(), }); let result = self .send_daemon_event(event) diff --git a/binaries/daemon/src/shared_mem_handler.rs b/binaries/daemon/src/shared_mem_handler.rs index 2f6c32911..b3d4f877f 100644 --- a/binaries/daemon/src/shared_mem_handler.rs +++ b/binaries/daemon/src/shared_mem_handler.rs @@ -181,7 +181,7 @@ impl SharedMemHandler { node_id, output_id, metadata, - data, + data: data.into(), }) .await; let _ = reply_sender.send(DaemonReply::Result( diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8807baa46..f527efc5d 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -47,9 +47,10 @@ pub enum DaemonRequest { SendPreparedMessage { id: SharedMemoryId, }, - SendEmptyMessage { + SendMessage { output_id: DataId, metadata: Metadata<'static>, + data: Vec, }, CloseOutputs(Vec), Stopped, @@ -87,7 +88,7 @@ pub struct DropEvent { } #[derive( - Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] pub struct DropToken(Uuid); @@ -98,7 +99,22 @@ impl DropToken { } #[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct InputData { +pub enum InputData { + SharedMemory(SharedMemoryInput), + Vec(Vec), +} + +impl InputData { + pub fn drop_token(&self) -> Option { + match self { + InputData::SharedMemory(data) => Some(data.drop_token), + InputData::Vec(_) => None, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct SharedMemoryInput { pub shared_memory_id: SharedMemoryId, pub len: usize, pub drop_token: DropToken,