From 00421e2bcdbcc52263f7d80c54aceb2a754fdf44 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 7 Mar 2023 11:49:59 +0100 Subject: [PATCH] Send small messages directly without shared memory We currently need an additional request to prepare zero-copy shared memory messages. For small messages, it might be faster to avoid this extra round-trip and send the message directly over TCP. This commit implements support for this. Messages smaller than a threshold (currently set to 4096 bytes) are sent via TCP, while larger messages still use shared memory. This step also enables future optimizations such as queueing output messages in order to improve the throughput. --- apis/rust/node/src/daemon/mod.rs | 47 ++++++---- apis/rust/node/src/lib.rs | 9 +- binaries/daemon/src/lib.rs | 104 ++++++++++++++++------ binaries/daemon/src/listener/mod.rs | 9 +- binaries/daemon/src/shared_mem_handler.rs | 2 +- libraries/core/src/daemon_messages.rs | 22 ++++- 6 files changed, 138 insertions(+), 55 deletions(-) diff --git a/apis/rust/node/src/daemon/mod.rs b/apis/rust/node/src/daemon/mod.rs index 2f1616aa..270118fe 100644 --- a/apis/rust/node/src/daemon/mod.rs +++ b/apis/rust/node/src/daemon/mod.rs @@ -145,16 +145,18 @@ impl ControlChannel { } } - pub fn send_empty_message( + pub fn send_message( &mut self, output_id: DataId, metadata: Metadata<'static>, + data: Vec, ) -> eyre::Result<()> { let reply = self .channel - .request(&DaemonRequest::SendEmptyMessage { + .request(&DaemonRequest::SendMessage { output_id, metadata, + data, }) .wrap_err("failed to send SendEmptyMessage request to dora-daemon")?; match reply { @@ -275,7 +277,7 @@ impl EventStream { let drop_token = match &event { NodeEvent::Input { data: Some(data), .. - } => Some(data.drop_token.clone()), + } => data.drop_token(), NodeEvent::Stop | NodeEvent::InputClosed { .. } | NodeEvent::Input { data: None, .. } => None, @@ -350,18 +352,21 @@ impl EventStream { NodeEvent::Stop => Event::Stop, NodeEvent::InputClosed { id } => Event::InputClosed { id }, NodeEvent::Input { id, metadata, data } => { - let mapped = data - .map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) }) + let data = data + .map(|data| match data { + dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)), + dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe { + MappedInputData::map(&d.shared_memory_id, d.len).map(|data| { + Data::SharedMemory { + data, + _drop: ack_channel, + } + }) + }, + }) .transpose(); - match mapped { - Ok(mapped) => Event::Input { - id, - metadata, - data: mapped.map(|data| Data { - data, - _drop: ack_channel, - }), - }, + match data { + Ok(data) => Event::Input { id, metadata, data }, Err(err) => Event::Error(format!("{err:?}")), } } @@ -394,16 +399,22 @@ pub enum Event<'a> { Error(String), } -pub struct Data<'a> { - data: MappedInputData<'a>, - _drop: std::sync::mpsc::Sender<()>, +pub enum Data<'a> { + Vec(Vec), + SharedMemory { + data: MappedInputData<'a>, + _drop: std::sync::mpsc::Sender<()>, + }, } impl std::ops::Deref for Data<'_> { type Target = [u8]; fn deref(&self) -> &Self::Target { - &self.data + match self { + Data::SharedMemory { data, .. } => data, + Data::Vec(data) => data, + } } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 4304215c..0da003aa 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf; mod daemon; +const ZERO_COPY_THRESHOLD: usize = 4096; + pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, @@ -70,7 +72,7 @@ impl DoraNode { } let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); - if data_len > 0 { + if data_len >= ZERO_COPY_THRESHOLD { let sample = self .control_channel .prepare_message(output_id.clone(), metadata, data_len) @@ -88,9 +90,10 @@ impl DoraNode { .send_prepared_message(sample) .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; } else { - data(&mut []); + let mut buffer = vec![0; data_len]; + data(&mut buffer); self.control_channel - .send_empty_message(output_id.clone(), metadata) + .send_message(output_id.clone(), metadata, buffer) .wrap_err_with(|| format!("failed to send output {output_id}"))?; } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 81bc9699..faf6dd32 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -687,20 +687,35 @@ impl Daemon { let mut drop_tokens = Vec::new(); for (receiver_id, input_id) in local_receivers { if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { - let drop_token = DropToken::generate(); - let send_result = channel.send_async(daemon_messages::NodeEvent::Input { + let mut drop_token = None; + let item = daemon_messages::NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), - data: data.as_ref().map(|data| daemon_messages::InputData { - shared_memory_id: data.get_os_id().to_owned(), - len: data.len(), - drop_token: drop_token.clone(), - }), - }); + data: match &data { + Data::None => None, + Data::SharedMemory(data) => { + let token = DropToken::generate(); + drop_token = Some(token); + Some(daemon_messages::InputData::SharedMemory( + daemon_messages::SharedMemoryInput { + shared_memory_id: data.get_os_id().to_owned(), + len: data.len(), + drop_token: token, + }, + )) + } + Data::Vec(data) => { + Some(daemon_messages::InputData::Vec(data.clone())) + } + }, + }; + let send_result = channel.send_async(item); match timeout(Duration::from_millis(10), send_result).await { Ok(Ok(())) => { - drop_tokens.push(drop_token); + if let Some(token) = drop_token { + drop_tokens.push(token); + } } Ok(Err(_)) => { closed.push(receiver_id); @@ -716,25 +731,32 @@ impl Daemon { for id in closed { dataflow.subscribe_channels.remove(id); } - let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned()); - - // report drop tokens to shared memory handler - if let Some(data) = data { - if let Err(err) = self - .shared_memory_handler - .send_async(shared_mem_handler::DaemonEvent::SentOut { - data: *data, - drop_tokens, - }) - .await - .wrap_err("shared mem handler crashed after send out") - { - tracing::error!("{err:?}"); + + let data_bytes = match data { + Data::SharedMemory(data) => { + let bytes = unsafe { data.as_slice() }.to_owned(); + + // report drop tokens to shared memory handler + let send_result = self + .shared_memory_handler + .send_async(shared_mem_handler::DaemonEvent::SentOut { + data: *data, + drop_tokens, + }) + .await; + if let Err(err) = + send_result.wrap_err("shared mem handler crashed after send out") + { + tracing::error!("{err:?}"); + } + + bytes } - } + Data::Vec(data) => data, + Data::None => Vec::new(), + }; // TODO send `data` via network to all remove receivers - if let Some(data) = data_bytes {} } ShmemHandlerEvent::HandlerError(err) => { bail!(err.wrap_err("shared memory handler failed")) @@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent { node_id: NodeId, output_id: DataId, metadata: dora_core::message::Metadata<'static>, - data: Option>, + data: Data, }, HandlerError(eyre::ErrReport), } @@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent { .field("node_id", node_id) .field("output_id", output_id) .field("metadata", metadata) - .field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None")) + .field( + "data", + match &data { + Data::None => &"None", + Data::SharedMemory(_) => &"SharedMemory(..)", + Data::Vec(_) => &"Vec(..)", + }, + ) .finish(), ShmemHandlerEvent::HandlerError(err) => { f.debug_tuple("HandlerError").field(err).finish() @@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent { } } +pub enum Data { + None, + SharedMemory(Box), + Vec(Vec), +} + +impl From>> for Data { + fn from(data: Option>) -> Self { + match data { + Some(data) => Self::SharedMemory(data), + None => Self::None, + } + } +} + +impl From> for Data { + fn from(data: Vec) -> Self { + Self::Vec(data) + } +} + #[derive(Debug)] pub enum DoraEvent { Timer { diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 3bec44ef..58345446 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -241,7 +241,9 @@ where data: Some(data), .. }) = self.queue.remove(index) { - drop_tokens.push(data.drop_token); + if let Some(drop_token) = data.drop_token() { + drop_tokens.push(drop_token); + } } } self.report_drop_tokens(drop_tokens).await?; @@ -296,9 +298,10 @@ where ) .await?; } - DaemonRequest::SendEmptyMessage { + DaemonRequest::SendMessage { output_id, metadata, + data, } => { // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); @@ -307,7 +310,7 @@ where node_id: self.node_id.clone(), output_id, metadata, - data: None, + data: data.into(), }); let result = self .send_daemon_event(event) diff --git a/binaries/daemon/src/shared_mem_handler.rs b/binaries/daemon/src/shared_mem_handler.rs index 2f6c3291..b3d4f877 100644 --- a/binaries/daemon/src/shared_mem_handler.rs +++ b/binaries/daemon/src/shared_mem_handler.rs @@ -181,7 +181,7 @@ impl SharedMemHandler { node_id, output_id, metadata, - data, + data: data.into(), }) .await; let _ = reply_sender.send(DaemonReply::Result( diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8807baa4..f527efc5 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -47,9 +47,10 @@ pub enum DaemonRequest { SendPreparedMessage { id: SharedMemoryId, }, - SendEmptyMessage { + SendMessage { output_id: DataId, metadata: Metadata<'static>, + data: Vec, }, CloseOutputs(Vec), Stopped, @@ -87,7 +88,7 @@ pub struct DropEvent { } #[derive( - Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] pub struct DropToken(Uuid); @@ -98,7 +99,22 @@ impl DropToken { } #[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct InputData { +pub enum InputData { + SharedMemory(SharedMemoryInput), + Vec(Vec), +} + +impl InputData { + pub fn drop_token(&self) -> Option { + match self { + InputData::SharedMemory(data) => Some(data.drop_token), + InputData::Vec(_) => None, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct SharedMemoryInput { pub shared_memory_id: SharedMemoryId, pub len: usize, pub drop_token: DropToken,