diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 469de8c7..7f38d5f7 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -33,8 +33,10 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tracing::log::error; use uuid::Uuid; mod coordinator; @@ -206,8 +208,7 @@ impl Daemon { while let Some(event) = events.next().await { match event { Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { - let (reply, status) = self.handle_coordinator_event(event).await?; - let _ = reply_tx.send(reply); + let status = self.handle_coordinator_event(event, reply_tx).await?; match status { RunStatus::Continue => {} @@ -258,8 +259,9 @@ impl Daemon { async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, - ) -> eyre::Result<(Option, RunStatus)> { - let (reply, status) = match event { + reply_tx: Sender>, + ) -> eyre::Result { + let status = match event { DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, working_dir, @@ -291,7 +293,10 @@ impl Daemon { } let reply = DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); - (Some(reply), RunStatus::Continue) + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send `SpawnResult` reply from daemon to coordinator") + }); + RunStatus::Continue } DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { match self.running.get_mut(&dataflow_id) { @@ -305,32 +310,39 @@ impl Daemon { ); } } - (None, RunStatus::Continue) + let _ = reply_tx.send(None).map_err(|_| { + error!("could not send `AllNodesReady` reply from daemon to coordinator") + }); + RunStatus::Continue } DaemonCoordinatorEvent::Logs { dataflow_id, node_id, } => { - // read file - let logs = async { - let log_dir = temp_dir(); - - let mut file = File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) - .await - .wrap_err("Could not open log file")?; - - let mut contents = vec![]; - file.read_to_end(&mut contents) - .await - .wrap_err("Could not read content of log file")?; - Result::, eyre::Report>::Ok(contents) - } - .await - .map_err(|err| format!("{err:?}")); - ( - Some(DaemonCoordinatorReply::Logs(logs)), - RunStatus::Continue, - ) + tokio::spawn(async move { + let logs = async { + let log_dir = temp_dir(); + + let mut file = + File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) + .await + .wrap_err("Could not open log file")?; + + let mut contents = vec![]; + file.read_to_end(&mut contents) + .await + .wrap_err("Could not read content of log file")?; + Result::, eyre::Report>::Ok(contents) + } + .await + .map_err(|err| format!("{err:?}")); + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::Logs(logs))) + .map_err(|_| { + error!("could not send logs reply from daemon to coordinator") + }); + }); + RunStatus::Continue } DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, @@ -340,7 +352,10 @@ impl Daemon { let result = self.send_reload(dataflow_id, node_id, operator_id).await; let reply = DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}"))); - (Some(reply), RunStatus::Continue) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send reload reply from daemon to coordinator")); + RunStatus::Continue } DaemonCoordinatorEvent::StopDataflow { dataflow_id } => { let stop = async { @@ -354,19 +369,25 @@ impl Daemon { let reply = DaemonCoordinatorReply::StopResult( stop.await.map_err(|err| format!("{err:?}")), ); - (Some(reply), RunStatus::Continue) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send stop reply from daemon to coordinator")); + RunStatus::Continue } DaemonCoordinatorEvent::Destroy => { tracing::info!("received destroy command -> exiting"); let reply = DaemonCoordinatorReply::DestroyResult(Ok(())); - (Some(reply), RunStatus::Exit) + let _ = reply_tx + .send(Some(reply)) + .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); + RunStatus::Exit + } + DaemonCoordinatorEvent::Watchdog => { + let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck)); + RunStatus::Continue } - DaemonCoordinatorEvent::Watchdog => ( - Some(DaemonCoordinatorReply::WatchdogAck), - RunStatus::Continue, - ), }; - Ok((reply, status)) + Ok(status) } async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {