Skip to content

Commit

Permalink
Merge pull request #230 from dora-rs/dont-wait
Browse files Browse the repository at this point in the history
Avoid blocking the daemon main loop by using unbounded queue
  • Loading branch information
phil-opp authored Mar 22, 2023
2 parents 18d2c55 + 35b493d commit ad4ff0f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 51 deletions.
67 changes: 22 additions & 45 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use std::{
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
fs,
sync::{mpsc, oneshot},
time::timeout,
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use uuid::Uuid;
Expand Down Expand Up @@ -461,10 +461,8 @@ impl Daemon {
metadata: metadata.clone(),
data: data.clone(),
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
match channel.send(item) {
Ok(()) => {
if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
dataflow
.pending_drop_tokens
Expand All @@ -477,13 +475,8 @@ impl Daemon {
.insert(receiver_id.clone());
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
}
Err(_) => {
tracing::warn!(
"dropping input event `{receiver_id}/{input_id}` (send timeout)"
);
closed.push(receiver_id);
}
}
}
Expand Down Expand Up @@ -520,7 +513,7 @@ impl Daemon {
&mut self,
dataflow_id: Uuid,
node_id: NodeId,
event_sender: flume::Sender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
) -> Result<(), String> {
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
Expand All @@ -541,24 +534,18 @@ impl Daemon {
.unwrap_or(true)
});
for input_id in closed_inputs {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});
}
if dataflow.open_inputs(&node_id).is_empty() {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::AllInputsClosed)
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::AllInputsClosed);
}

// if a stop event was already sent for the dataflow, send it to
// the newly connected node too
if dataflow.stop_sent {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::Stop)
.await;
let _ = event_sender.send(daemon_messages::NodeEvent::Stop);
}

dataflow.subscribe_channels.insert(node_id, event_sender);
Expand Down Expand Up @@ -624,20 +611,15 @@ impl Daemon {
continue;
};

let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let send_result = channel.send(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
});
match timeout(Duration::from_millis(1), send_result).await {
Ok(Ok(())) => {}
Ok(Err(_)) => {
closed.push(receiver_id);
}
match send_result {
Ok(()) => {}
Err(_) => {
tracing::info!(
"dropping timer tick event for `{receiver_id}` (send timeout)"
);
closed.push(receiver_id);
}
}
}
Expand Down Expand Up @@ -784,24 +766,20 @@ where
open_inputs.remove(input_id);
}
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let _ = channel
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
let _ = channel.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});

if dataflow.open_inputs(receiver_id).is_empty() {
let _ = channel
.send_async(daemon_messages::NodeEvent::AllInputsClosed)
.await;
let _ = channel.send(daemon_messages::NodeEvent::AllInputsClosed);
}
}
}
}

#[derive(Default)]
pub struct RunningDataflow {
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>,
subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
Expand All @@ -822,7 +800,7 @@ pub struct RunningDataflow {
impl RunningDataflow {
async fn stop_all(&mut self) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await;
let _ = channel.send(daemon_messages::NodeEvent::Stop);
}
self.stop_sent = true;
}
Expand All @@ -838,8 +816,7 @@ impl RunningDataflow {
let (drop_token, info) = entry.remove_entry();
let result = match self.subscribe_channels.get_mut(&info.owner) {
Some(channel) => channel
.send_async(daemon_messages::NodeEvent::OutputDropped { drop_token })
.await
.send(daemon_messages::NodeEvent::OutputDropped { drop_token })
.wrap_err("send failed"),
None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
};
Expand Down Expand Up @@ -899,7 +876,7 @@ pub enum DaemonNodeEvent {
reply_sender: oneshot::Sender<DaemonReply>,
},
Subscribe {
event_sender: flume::Sender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
reply_sender: oneshot::Sender<DaemonReply>,
},
CloseOutputs {
Expand Down
15 changes: 9 additions & 6 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use shared_memory_server::{ShmemConf, ShmemServer};
use std::{mem, net::Ipv4Addr};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
sync::{
mpsc::{self, UnboundedReceiver},
oneshot,
},
};

// TODO unify and avoid duplication;
Expand Down Expand Up @@ -90,7 +93,7 @@ struct Listener<C> {
dataflow_id: DataflowId,
node_id: NodeId,
daemon_tx: mpsc::Sender<Event>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
subscribed_events: Option<UnboundedReceiver<NodeEvent>>,
queue: Vec<NodeEvent>,
max_queue_len: usize,
connection: C,
Expand Down Expand Up @@ -275,7 +278,7 @@ where
self.process_daemon_event(event, None).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
let (tx, rx) = mpsc::unbounded_channel();
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::Subscribe {
Expand All @@ -295,9 +298,9 @@ where
let reply = if queued_events.is_empty() {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv_async().await {
Ok(event) => DaemonReply::NextEvents(vec![event]),
Err(flume::RecvError::Disconnected) => DaemonReply::NextEvents(vec![]),
Some(events) => match events.recv().await {
Some(event) => DaemonReply::NextEvents(vec![event]),
None => DaemonReply::NextEvents(vec![]),
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
Expand Down

0 comments on commit ad4ff0f

Please sign in to comment.