Skip to content

Commit

Permalink
Reply logs in a new thread in order to not block daemon when reading …
Browse files Browse the repository at this point in the history
…logs
  • Loading branch information
haixuanTao committed May 2, 2023
1 parent 7f82f32 commit 5c98327
Showing 1 changed file with 55 additions and 34 deletions.
89 changes: 55 additions & 34 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::log::error;
use uuid::Uuid;

mod coordinator;
Expand Down Expand Up @@ -206,8 +208,7 @@ impl Daemon {
while let Some(event) = events.next().await {
match event {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let (reply, status) = self.handle_coordinator_event(event).await?;
let _ = reply_tx.send(reply);
let status = self.handle_coordinator_event(event, reply_tx).await?;

match status {
RunStatus::Continue => {}
Expand Down Expand Up @@ -258,8 +259,9 @@ impl Daemon {
async fn handle_coordinator_event(
&mut self,
event: DaemonCoordinatorEvent,
) -> eyre::Result<(Option<DaemonCoordinatorReply>, RunStatus)> {
let (reply, status) = match event {
reply_tx: Sender<Option<DaemonCoordinatorReply>>,
) -> eyre::Result<RunStatus> {
let status = match event {
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
dataflow_id,
working_dir,
Expand Down Expand Up @@ -291,7 +293,10 @@ impl Daemon {
}
let reply =
DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
(Some(reply), RunStatus::Continue)
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send `SpawnResult` reply from daemon to coordinator")
});
RunStatus::Continue
}
DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => {
match self.running.get_mut(&dataflow_id) {
Expand All @@ -305,32 +310,39 @@ impl Daemon {
);
}
}
(None, RunStatus::Continue)
let _ = reply_tx.send(None).map_err(|_| {
error!("could not send `AllNodesReady` reply from daemon to coordinator")
});
RunStatus::Continue
}
DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id,
} => {
// read file
let logs = async {
let log_dir = temp_dir();

let mut file = File::open(log_dir.join(log::log_path(&dataflow_id, &node_id)))
.await
.wrap_err("Could not open log file")?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await
.map_err(|err| format!("{err:?}"));
(
Some(DaemonCoordinatorReply::Logs(logs)),
RunStatus::Continue,
)
tokio::spawn(async move {
let logs = async {
let log_dir = temp_dir();

let mut file =
File::open(log_dir.join(log::log_path(&dataflow_id, &node_id)))
.await
.wrap_err("Could not open log file")?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await
.map_err(|err| format!("{err:?}"));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::Logs(logs)))
.map_err(|_| {
error!("could not send logs reply from daemon to coordinator")
});
});
RunStatus::Continue
}
DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
Expand All @@ -340,7 +352,10 @@ impl Daemon {
let result = self.send_reload(dataflow_id, node_id, operator_id).await;
let reply =
DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
(Some(reply), RunStatus::Continue)
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send reload reply from daemon to coordinator"));
RunStatus::Continue
}
DaemonCoordinatorEvent::StopDataflow { dataflow_id } => {
let stop = async {
Expand All @@ -354,19 +369,25 @@ impl Daemon {
let reply = DaemonCoordinatorReply::StopResult(
stop.await.map_err(|err| format!("{err:?}")),
);
(Some(reply), RunStatus::Continue)
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));
RunStatus::Continue
}
DaemonCoordinatorEvent::Destroy => {
tracing::info!("received destroy command -> exiting");
let reply = DaemonCoordinatorReply::DestroyResult(Ok(()));
(Some(reply), RunStatus::Exit)
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
RunStatus::Exit
}
DaemonCoordinatorEvent::Watchdog => {
let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck));
RunStatus::Continue
}
DaemonCoordinatorEvent::Watchdog => (
Some(DaemonCoordinatorReply::WatchdogAck),
RunStatus::Continue,
),
};
Ok((reply, status))
Ok(status)
}

async fn handle_inter_daemon_event(&mut self, event: InterDaemonEvent) -> eyre::Result<()> {
Expand Down

0 comments on commit 5c98327

Please sign in to comment.