Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send small messages directly without shared memory #193

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions apis/rust/node/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@ impl ControlChannel {
}
}

pub fn send_empty_message(
pub fn send_message(
&mut self,
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::SendEmptyMessage {
.request(&DaemonRequest::SendMessage {
output_id,
metadata,
data,
})
.wrap_err("failed to send SendEmptyMessage request to dora-daemon")?;
match reply {
Expand Down Expand Up @@ -275,7 +277,7 @@ impl EventStream {
let drop_token = match &event {
NodeEvent::Input {
data: Some(data), ..
} => Some(data.drop_token.clone()),
} => data.drop_token(),
NodeEvent::Stop
| NodeEvent::InputClosed { .. }
| NodeEvent::Input { data: None, .. } => None,
Expand Down Expand Up @@ -350,18 +352,21 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let mapped = data
.map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) })
let data = data
.map(|data| match data {
dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)),
dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe {
MappedInputData::map(&d.shared_memory_id, d.len).map(|data| {
Data::SharedMemory {
data,
_drop: ack_channel,
}
})
},
})
.transpose();
match mapped {
Ok(mapped) => Event::Input {
id,
metadata,
data: mapped.map(|data| Data {
data,
_drop: ack_channel,
}),
},
match data {
Ok(data) => Event::Input { id, metadata, data },
Err(err) => Event::Error(format!("{err:?}")),
}
}
Expand Down Expand Up @@ -394,16 +399,22 @@ pub enum Event<'a> {
Error(String),
}

pub struct Data<'a> {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
pub enum Data<'a> {
Vec(Vec<u8>),
SharedMemory {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
},
}

impl std::ops::Deref for Data<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
match self {
Data::SharedMemory { data, .. } => data,
Data::Vec(data) => data,
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf;

mod daemon;

const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
Expand Down Expand Up @@ -70,7 +72,7 @@ impl DoraNode {
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());

if data_len > 0 {
if data_len >= ZERO_COPY_THRESHOLD {
let sample = self
.control_channel
.prepare_message(output_id.clone(), metadata, data_len)
Expand All @@ -88,9 +90,10 @@ impl DoraNode {
.send_prepared_message(sample)
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
} else {
data(&mut []);
let mut buffer = vec![0; data_len];
phil-opp marked this conversation as resolved.
Show resolved Hide resolved
data(&mut buffer);
self.control_channel
.send_empty_message(output_id.clone(), metadata)
.send_message(output_id.clone(), metadata, buffer)
.wrap_err_with(|| format!("failed to send output {output_id}"))?;
}

Expand Down
104 changes: 77 additions & 27 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,20 +687,35 @@ impl Daemon {
let mut drop_tokens = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let drop_token = DropToken::generate();
let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let mut drop_token = None;
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.as_ref().map(|data| daemon_messages::InputData {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: drop_token.clone(),
}),
});
data: match &data {
Data::None => None,
Data::SharedMemory(data) => {
let token = DropToken::generate();
drop_token = Some(token);
Some(daemon_messages::InputData::SharedMemory(
daemon_messages::SharedMemoryInput {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: token,
},
))
}
Data::Vec(data) => {
Some(daemon_messages::InputData::Vec(data.clone()))
}
},
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
drop_tokens.push(drop_token);
if let Some(token) = drop_token {
drop_tokens.push(token);
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
Expand All @@ -716,25 +731,32 @@ impl Daemon {
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned());

// report drop tokens to shared memory handler
if let Some(data) = data {
if let Err(err) = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await
.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");

let data_bytes = match data {
Data::SharedMemory(data) => {
let bytes = unsafe { data.as_slice() }.to_owned();

// report drop tokens to shared memory handler
let send_result = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await;
if let Err(err) =
send_result.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");
}

bytes
}
}
Data::Vec(data) => data,
Data::None => Vec::new(),
};

// TODO send `data` via network to all remove receivers
if let Some(data) = data_bytes {}
}
ShmemHandlerEvent::HandlerError(err) => {
bail!(err.wrap_err("shared memory handler failed"))
Expand Down Expand Up @@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent {
node_id: NodeId,
output_id: DataId,
metadata: dora_core::message::Metadata<'static>,
data: Option<Box<SharedMemSample>>,
data: Data,
},
HandlerError(eyre::ErrReport),
}
Expand All @@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent {
.field("node_id", node_id)
.field("output_id", output_id)
.field("metadata", metadata)
.field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None"))
.field(
"data",
match &data {
Data::None => &"None",
Data::SharedMemory(_) => &"SharedMemory(..)",
Data::Vec(_) => &"Vec(..)",
},
)
.finish(),
ShmemHandlerEvent::HandlerError(err) => {
f.debug_tuple("HandlerError").field(err).finish()
Expand All @@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent {
}
}

pub enum Data {
None,
SharedMemory(Box<SharedMemSample>),
Vec(Vec<u8>),
}

impl From<Option<Box<SharedMemSample>>> for Data {
fn from(data: Option<Box<SharedMemSample>>) -> Self {
match data {
Some(data) => Self::SharedMemory(data),
None => Self::None,
}
}
}

impl From<Vec<u8>> for Data {
fn from(data: Vec<u8>) -> Self {
Self::Vec(data)
}
}

#[derive(Debug)]
pub enum DoraEvent {
Timer {
Expand Down
9 changes: 6 additions & 3 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ where
data: Some(data), ..
}) = self.queue.remove(index)
{
drop_tokens.push(data.drop_token);
if let Some(drop_token) = data.drop_token() {
drop_tokens.push(drop_token);
}
}
}
self.report_drop_tokens(drop_tokens).await?;
Expand Down Expand Up @@ -296,9 +298,10 @@ where
)
.await?;
}
DaemonRequest::SendEmptyMessage {
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
Expand All @@ -307,7 +310,7 @@ where
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
data: data.into(),
});
let result = self
.send_daemon_event(event)
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/shared_mem_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl SharedMemHandler {
node_id,
output_id,
metadata,
data,
data: data.into(),
})
.await;
let _ = reply_sender.send(DaemonReply::Result(
Expand Down
22 changes: 19 additions & 3 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ pub enum DaemonRequest {
SendPreparedMessage {
id: SharedMemoryId,
},
SendEmptyMessage {
SendMessage {
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
},
CloseOutputs(Vec<DataId>),
Stopped,
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct DropEvent {
}

#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

Expand All @@ -98,7 +99,22 @@ impl DropToken {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,
Expand Down