Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid blocking the daemon main loop by using unbounded queue #230

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 22 additions & 45 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use std::{
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
fs,
sync::{mpsc, oneshot},
time::timeout,
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use uuid::Uuid;
Expand Down Expand Up @@ -461,10 +461,8 @@ impl Daemon {
metadata: metadata.clone(),
data: data.clone(),
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
match channel.send(item) {
Ok(()) => {
if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
dataflow
.pending_drop_tokens
Expand All @@ -477,13 +475,8 @@ impl Daemon {
.insert(receiver_id.clone());
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
}
Err(_) => {
tracing::warn!(
"dropping input event `{receiver_id}/{input_id}` (send timeout)"
);
closed.push(receiver_id);
}
}
}
Expand Down Expand Up @@ -520,7 +513,7 @@ impl Daemon {
&mut self,
dataflow_id: Uuid,
node_id: NodeId,
event_sender: flume::Sender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
) -> Result<(), String> {
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
Expand All @@ -541,24 +534,18 @@ impl Daemon {
.unwrap_or(true)
});
for input_id in closed_inputs {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});
}
if dataflow.open_inputs(&node_id).is_empty() {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::AllInputsClosed)
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::AllInputsClosed);
}

// if a stop event was already sent for the dataflow, send it to
// the newly connected node too
if dataflow.stop_sent {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::Stop)
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::Stop);
}

dataflow.subscribe_channels.insert(node_id, event_sender);
Expand Down Expand Up @@ -624,20 +611,15 @@ impl Daemon {
continue;
};

let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let send_result = channel.send(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
});
match timeout(Duration::from_millis(1), send_result).await {
Ok(Ok(())) => {}
Ok(Err(_)) => {
closed.push(receiver_id);
}
match send_result {
Ok(()) => {}
Err(_) => {
tracing::info!(
"dropping timer tick event for `{receiver_id}` (send timeout)"
);
closed.push(receiver_id);
}
}
}
Expand Down Expand Up @@ -784,24 +766,20 @@ where
open_inputs.remove(input_id);
}
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let _ = channel
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
let _ = channel.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});

if dataflow.open_inputs(receiver_id).is_empty() {
let _ = channel
.send_async(daemon_messages::NodeEvent::AllInputsClosed)
.await;
let _ = channel.send(daemon_messages::NodeEvent::AllInputsClosed);
}
}
}
}

#[derive(Default)]
pub struct RunningDataflow {
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>,
subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
Expand All @@ -822,7 +800,7 @@ pub struct RunningDataflow {
impl RunningDataflow {
async fn stop_all(&mut self) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await;
let _ = channel.send(daemon_messages::NodeEvent::Stop);
}
self.stop_sent = true;
}
Expand All @@ -838,8 +816,7 @@ impl RunningDataflow {
let (drop_token, info) = entry.remove_entry();
let result = match self.subscribe_channels.get_mut(&info.owner) {
Some(channel) => channel
.send_async(daemon_messages::NodeEvent::OutputDropped { drop_token })
.await
.send(daemon_messages::NodeEvent::OutputDropped { drop_token })
.wrap_err("send failed"),
None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
};
Expand Down Expand Up @@ -899,7 +876,7 @@ pub enum DaemonNodeEvent {
reply_sender: oneshot::Sender<DaemonReply>,
},
Subscribe {
event_sender: flume::Sender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
reply_sender: oneshot::Sender<DaemonReply>,
},
CloseOutputs {
Expand Down
15 changes: 9 additions & 6 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use shared_memory_server::{ShmemConf, ShmemServer};
use std::{mem, net::Ipv4Addr};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
sync::{
mpsc::{self, UnboundedReceiver},
oneshot,
},
};

// TODO unify and avoid duplication;
Expand Down Expand Up @@ -90,7 +93,7 @@ struct Listener<C> {
dataflow_id: DataflowId,
node_id: NodeId,
daemon_tx: mpsc::Sender<Event>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
subscribed_events: Option<UnboundedReceiver<NodeEvent>>,
queue: Vec<NodeEvent>,
max_queue_len: usize,
connection: C,
Expand Down Expand Up @@ -275,7 +278,7 @@ where
self.process_daemon_event(event, None).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
let (tx, rx) = mpsc::unbounded_channel();
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::Subscribe {
Expand All @@ -295,9 +298,9 @@ where
let reply = if queued_events.is_empty() {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv_async().await {
Ok(event) => DaemonReply::NextEvents(vec![event]),
Err(flume::RecvError::Disconnected) => DaemonReply::NextEvents(vec![]),
Some(events) => match events.recv().await {
Some(event) => DaemonReply::NextEvents(vec![event]),
None => DaemonReply::NextEvents(vec![]),
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
Expand Down