Skip to content

Commit

Permalink
Send small messages directly without shared memory
Browse files Browse the repository at this point in the history
We currently need an additional request to prepare zero-copy shared memory messages. For small messages, it might be faster to avoid this extra round-trip and send the message directly over TCP. This commit implements support for this. Messages smaller than a threshold (currently set to 4096 bytes) are sent via TCP, while larger messages still use shared memory.

This step also enables future optimizations such as queueing output messages in order to improve the throughput.
  • Loading branch information
phil-opp committed Mar 7, 2023
1 parent a845cc5 commit 00421e2
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 55 deletions.
47 changes: 29 additions & 18 deletions apis/rust/node/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@ impl ControlChannel {
}
}

pub fn send_empty_message(
pub fn send_message(
&mut self,
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::SendEmptyMessage {
.request(&DaemonRequest::SendMessage {
output_id,
metadata,
data,
})
.wrap_err("failed to send SendEmptyMessage request to dora-daemon")?;
match reply {
Expand Down Expand Up @@ -275,7 +277,7 @@ impl EventStream {
let drop_token = match &event {
NodeEvent::Input {
data: Some(data), ..
} => Some(data.drop_token.clone()),
} => data.drop_token(),
NodeEvent::Stop
| NodeEvent::InputClosed { .. }
| NodeEvent::Input { data: None, .. } => None,
Expand Down Expand Up @@ -350,18 +352,21 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let mapped = data
.map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) })
let data = data
.map(|data| match data {
dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)),
dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe {
MappedInputData::map(&d.shared_memory_id, d.len).map(|data| {
Data::SharedMemory {
data,
_drop: ack_channel,
}
})
},
})
.transpose();
match mapped {
Ok(mapped) => Event::Input {
id,
metadata,
data: mapped.map(|data| Data {
data,
_drop: ack_channel,
}),
},
match data {
Ok(data) => Event::Input { id, metadata, data },
Err(err) => Event::Error(format!("{err:?}")),
}
}
Expand Down Expand Up @@ -394,16 +399,22 @@ pub enum Event<'a> {
Error(String),
}

pub struct Data<'a> {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
pub enum Data<'a> {
Vec(Vec<u8>),
SharedMemory {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
},
}

impl std::ops::Deref for Data<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
match self {
Data::SharedMemory { data, .. } => data,
Data::Vec(data) => data,
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf;

mod daemon;

const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
Expand Down Expand Up @@ -70,7 +72,7 @@ impl DoraNode {
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());

if data_len > 0 {
if data_len >= ZERO_COPY_THRESHOLD {
let sample = self
.control_channel
.prepare_message(output_id.clone(), metadata, data_len)
Expand All @@ -88,9 +90,10 @@ impl DoraNode {
.send_prepared_message(sample)
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
} else {
data(&mut []);
let mut buffer = vec![0; data_len];
data(&mut buffer);
self.control_channel
.send_empty_message(output_id.clone(), metadata)
.send_message(output_id.clone(), metadata, buffer)
.wrap_err_with(|| format!("failed to send output {output_id}"))?;
}

Expand Down
104 changes: 77 additions & 27 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,20 +687,35 @@ impl Daemon {
let mut drop_tokens = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let drop_token = DropToken::generate();
let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let mut drop_token = None;
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.as_ref().map(|data| daemon_messages::InputData {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: drop_token.clone(),
}),
});
data: match &data {
Data::None => None,
Data::SharedMemory(data) => {
let token = DropToken::generate();
drop_token = Some(token);
Some(daemon_messages::InputData::SharedMemory(
daemon_messages::SharedMemoryInput {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: token,
},
))
}
Data::Vec(data) => {
Some(daemon_messages::InputData::Vec(data.clone()))
}
},
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
drop_tokens.push(drop_token);
if let Some(token) = drop_token {
drop_tokens.push(token);
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
Expand All @@ -716,25 +731,32 @@ impl Daemon {
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned());

// report drop tokens to shared memory handler
if let Some(data) = data {
if let Err(err) = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await
.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");

let data_bytes = match data {
Data::SharedMemory(data) => {
let bytes = unsafe { data.as_slice() }.to_owned();

// report drop tokens to shared memory handler
let send_result = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await;
if let Err(err) =
send_result.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");
}

bytes
}
}
Data::Vec(data) => data,
Data::None => Vec::new(),
};

// TODO send `data` via network to all remove receivers
if let Some(data) = data_bytes {}
}
ShmemHandlerEvent::HandlerError(err) => {
bail!(err.wrap_err("shared memory handler failed"))
Expand Down Expand Up @@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent {
node_id: NodeId,
output_id: DataId,
metadata: dora_core::message::Metadata<'static>,
data: Option<Box<SharedMemSample>>,
data: Data,
},
HandlerError(eyre::ErrReport),
}
Expand All @@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent {
.field("node_id", node_id)
.field("output_id", output_id)
.field("metadata", metadata)
.field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None"))
.field(
"data",
match &data {
Data::None => &"None",
Data::SharedMemory(_) => &"SharedMemory(..)",
Data::Vec(_) => &"Vec(..)",
},
)
.finish(),
ShmemHandlerEvent::HandlerError(err) => {
f.debug_tuple("HandlerError").field(err).finish()
Expand All @@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent {
}
}

pub enum Data {
None,
SharedMemory(Box<SharedMemSample>),
Vec(Vec<u8>),
}

impl From<Option<Box<SharedMemSample>>> for Data {
fn from(data: Option<Box<SharedMemSample>>) -> Self {
match data {
Some(data) => Self::SharedMemory(data),
None => Self::None,
}
}
}

impl From<Vec<u8>> for Data {
fn from(data: Vec<u8>) -> Self {
Self::Vec(data)
}
}

#[derive(Debug)]
pub enum DoraEvent {
Timer {
Expand Down
9 changes: 6 additions & 3 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ where
data: Some(data), ..
}) = self.queue.remove(index)
{
drop_tokens.push(data.drop_token);
if let Some(drop_token) = data.drop_token() {
drop_tokens.push(drop_token);
}
}
}
self.report_drop_tokens(drop_tokens).await?;
Expand Down Expand Up @@ -296,9 +298,10 @@ where
)
.await?;
}
DaemonRequest::SendEmptyMessage {
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
Expand All @@ -307,7 +310,7 @@ where
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
data: data.into(),
});
let result = self
.send_daemon_event(event)
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/shared_mem_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl SharedMemHandler {
node_id,
output_id,
metadata,
data,
data: data.into(),
})
.await;
let _ = reply_sender.send(DaemonReply::Result(
Expand Down
22 changes: 19 additions & 3 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ pub enum DaemonRequest {
SendPreparedMessage {
id: SharedMemoryId,
},
SendEmptyMessage {
SendMessage {
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
},
CloseOutputs(Vec<DataId>),
Stopped,
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct DropEvent {
}

#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

Expand All @@ -98,7 +99,22 @@ impl DropToken {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,
Expand Down

0 comments on commit 00421e2

Please sign in to comment.