Skip to content

Commit

Permalink
Merge pull request #193 from dora-rs/copy-messages
Browse files Browse the repository at this point in the history
Send small messages directly without shared memory
  • Loading branch information
phil-opp authored Mar 7, 2023
2 parents a845cc5 + 00421e2 commit df4d8c5
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 55 deletions.
47 changes: 29 additions & 18 deletions apis/rust/node/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@ impl ControlChannel {
}
}

pub fn send_empty_message(
pub fn send_message(
&mut self,
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::SendEmptyMessage {
.request(&DaemonRequest::SendMessage {
output_id,
metadata,
data,
})
.wrap_err("failed to send SendEmptyMessage request to dora-daemon")?;
match reply {
Expand Down Expand Up @@ -275,7 +277,7 @@ impl EventStream {
let drop_token = match &event {
NodeEvent::Input {
data: Some(data), ..
} => Some(data.drop_token.clone()),
} => data.drop_token(),
NodeEvent::Stop
| NodeEvent::InputClosed { .. }
| NodeEvent::Input { data: None, .. } => None,
Expand Down Expand Up @@ -350,18 +352,21 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let mapped = data
.map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) })
let data = data
.map(|data| match data {
dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)),
dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe {
MappedInputData::map(&d.shared_memory_id, d.len).map(|data| {
Data::SharedMemory {
data,
_drop: ack_channel,
}
})
},
})
.transpose();
match mapped {
Ok(mapped) => Event::Input {
id,
metadata,
data: mapped.map(|data| Data {
data,
_drop: ack_channel,
}),
},
match data {
Ok(data) => Event::Input { id, metadata, data },
Err(err) => Event::Error(format!("{err:?}")),
}
}
Expand Down Expand Up @@ -394,16 +399,22 @@ pub enum Event<'a> {
Error(String),
}

pub struct Data<'a> {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
pub enum Data<'a> {
Vec(Vec<u8>),
SharedMemory {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
},
}

impl std::ops::Deref for Data<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
match self {
Data::SharedMemory { data, .. } => data,
Data::Vec(data) => data,
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf;

mod daemon;

const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
Expand Down Expand Up @@ -70,7 +72,7 @@ impl DoraNode {
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());

if data_len > 0 {
if data_len >= ZERO_COPY_THRESHOLD {
let sample = self
.control_channel
.prepare_message(output_id.clone(), metadata, data_len)
Expand All @@ -88,9 +90,10 @@ impl DoraNode {
.send_prepared_message(sample)
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
} else {
data(&mut []);
let mut buffer = vec![0; data_len];
data(&mut buffer);
self.control_channel
.send_empty_message(output_id.clone(), metadata)
.send_message(output_id.clone(), metadata, buffer)
.wrap_err_with(|| format!("failed to send output {output_id}"))?;
}

Expand Down
104 changes: 77 additions & 27 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,20 +687,35 @@ impl Daemon {
let mut drop_tokens = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let drop_token = DropToken::generate();
let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let mut drop_token = None;
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.as_ref().map(|data| daemon_messages::InputData {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: drop_token.clone(),
}),
});
data: match &data {
Data::None => None,
Data::SharedMemory(data) => {
let token = DropToken::generate();
drop_token = Some(token);
Some(daemon_messages::InputData::SharedMemory(
daemon_messages::SharedMemoryInput {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: token,
},
))
}
Data::Vec(data) => {
Some(daemon_messages::InputData::Vec(data.clone()))
}
},
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
drop_tokens.push(drop_token);
if let Some(token) = drop_token {
drop_tokens.push(token);
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
Expand All @@ -716,25 +731,32 @@ impl Daemon {
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned());

// report drop tokens to shared memory handler
if let Some(data) = data {
if let Err(err) = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await
.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");

let data_bytes = match data {
Data::SharedMemory(data) => {
let bytes = unsafe { data.as_slice() }.to_owned();

// report drop tokens to shared memory handler
let send_result = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await;
if let Err(err) =
send_result.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");
}

bytes
}
}
Data::Vec(data) => data,
Data::None => Vec::new(),
};

// TODO send `data` via network to all remove receivers
if let Some(data) = data_bytes {}
}
ShmemHandlerEvent::HandlerError(err) => {
bail!(err.wrap_err("shared memory handler failed"))
Expand Down Expand Up @@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent {
node_id: NodeId,
output_id: DataId,
metadata: dora_core::message::Metadata<'static>,
data: Option<Box<SharedMemSample>>,
data: Data,
},
HandlerError(eyre::ErrReport),
}
Expand All @@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent {
.field("node_id", node_id)
.field("output_id", output_id)
.field("metadata", metadata)
.field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None"))
.field(
"data",
match &data {
Data::None => &"None",
Data::SharedMemory(_) => &"SharedMemory(..)",
Data::Vec(_) => &"Vec(..)",
},
)
.finish(),
ShmemHandlerEvent::HandlerError(err) => {
f.debug_tuple("HandlerError").field(err).finish()
Expand All @@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent {
}
}

pub enum Data {
None,
SharedMemory(Box<SharedMemSample>),
Vec(Vec<u8>),
}

impl From<Option<Box<SharedMemSample>>> for Data {
fn from(data: Option<Box<SharedMemSample>>) -> Self {
match data {
Some(data) => Self::SharedMemory(data),
None => Self::None,
}
}
}

impl From<Vec<u8>> for Data {
fn from(data: Vec<u8>) -> Self {
Self::Vec(data)
}
}

#[derive(Debug)]
pub enum DoraEvent {
Timer {
Expand Down
9 changes: 6 additions & 3 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ where
data: Some(data), ..
}) = self.queue.remove(index)
{
drop_tokens.push(data.drop_token);
if let Some(drop_token) = data.drop_token() {
drop_tokens.push(drop_token);
}
}
}
self.report_drop_tokens(drop_tokens).await?;
Expand Down Expand Up @@ -296,9 +298,10 @@ where
)
.await?;
}
DaemonRequest::SendEmptyMessage {
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
Expand All @@ -307,7 +310,7 @@ where
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
data: data.into(),
});
let result = self
.send_daemon_event(event)
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/shared_mem_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl SharedMemHandler {
node_id,
output_id,
metadata,
data,
data: data.into(),
})
.await;
let _ = reply_sender.send(DaemonReply::Result(
Expand Down
22 changes: 19 additions & 3 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ pub enum DaemonRequest {
SendPreparedMessage {
id: SharedMemoryId,
},
SendEmptyMessage {
SendMessage {
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
},
CloseOutputs(Vec<DataId>),
Stopped,
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct DropEvent {
}

#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

Expand All @@ -98,7 +99,22 @@ impl DropToken {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,
Expand Down

0 comments on commit df4d8c5

Please sign in to comment.