Skip to content

Commit

Permalink
Provide a way to access logs through the CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
haixuanTao committed Apr 19, 2023
1 parent 9042354 commit 186a04c
Show file tree
Hide file tree
Showing 9 changed files with 802 additions and 53 deletions.
628 changes: 595 additions & 33 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ notify = "5.1.0"
ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
bat = "0.23.0"
39 changes: 39 additions & 0 deletions binaries/cli/src/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context, Result};
use uuid::Uuid;

use crate::control_connection;
use bat::{Input, PrettyPrinter};

pub fn logs(uuid: Option<Uuid>, name: Option<String>, node: String) -> Result<()> {
let mut control_session = None;
let connection = control_connection(&mut control_session)?;
let logs = {
let reply_raw = connection
.request(&serde_json::to_vec(&ControlRequest::Logs {
uuid,
name,
node: node.clone(),
})?)
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::Logs { logs } => logs,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
}
};

PrettyPrinter::new()
.header(true)
.grid(true)
.line_numbers(true)
.paging_mode(bat::PagingMode::Always)
.inputs(vec![Input::from_bytes(&logs)
.name("Logs") // TODO: Make a better name
.title(format!("Logs from {node}.").as_str())])
.print()
.unwrap();

Ok(())
}
9 changes: 8 additions & 1 deletion binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod attach;
mod build;
mod check;
mod graph;
mod logs;
mod template;
mod up;

Expand Down Expand Up @@ -87,7 +88,12 @@ enum Command {
List,
// Planned for future releases:
// Dashboard,
// Logs,
Logs {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
node: String,
},
// Metrics,
// Stats,
// Get,
Expand Down Expand Up @@ -160,6 +166,7 @@ fn main() -> eyre::Result<()> {
coordinator_path.as_deref(),
daemon_path.as_deref(),
)?,
Command::Logs { uuid, name, node } => logs::logs(uuid, name, node)?,
Command::Start {
dataflow,
name,
Expand Down
90 changes: 76 additions & 14 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ pub async fn run() -> eyre::Result<()> {
Ok(())
}

fn resolve_name(
name: String,
running_dataflows: &HashMap<Uuid, RunningDataflow>,
) -> eyre::Result<Uuid> {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect();
if uuids.is_empty() {
bail!("no running dataflow with name `{name}`");
} else if let [uuid] = uuids.as_slice() {
Ok(*uuid)
} else {
bail!("multiple dataflows found with name `{name}`");
}
}

async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

Expand Down Expand Up @@ -247,20 +266,7 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
}
ControlRequest::StopByName { name } => {
let stop = async {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect();
let dataflow_uuid = if uuids.is_empty() {
bail!("no running dataflow with name `{name}`");
} else if let [uuid] = uuids.as_slice() {
*uuid
} else {
bail!("multiple dataflows found with name `{name}`");
};

let dataflow_uuid = resolve_name(name, &running_dataflows)?;
stop_dataflow(
&running_dataflows,
dataflow_uuid,
Expand All @@ -272,6 +278,24 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
stop.await
.map(|uuid| ControlRequestReply::DataflowStopped { uuid })
}
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
} else if let Some(name) = name {
resolve_name(name, &running_dataflows)?
} else {
bail!("No uuid")
};

retrieve_logs(
&running_dataflows,
dataflow_uuid,
node.into(),
&mut daemon_connections,
)
.await
.map(|logs| ControlRequestReply::Logs { logs })
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");

Expand Down Expand Up @@ -497,6 +521,44 @@ async fn reload_dataflow(
Ok(())
}

async fn retrieve_logs(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<Vec<u8>> {
let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
bail!("No running dataflow found with UUID `{dataflow_id}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id,
})?;
let mut reply_logs = Vec::new();
for machine_id in &dataflow.machines {
let daemon_connection = daemon_connections
.get_mut(machine_id)
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec
tcp_send(daemon_connection, &message)
.await
.wrap_err("failed to send reload message to daemon")?;

// wait for reply
let reply_raw = tcp_receive(daemon_connection)
.await
.wrap_err("failed to receive reload reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize reload reply from daemon")?
{
DaemonCoordinatorReply::Logs { logs } => reply_logs = logs,
other => bail!("unexpected reply after sending reload: {other:?}"),
}
}
tracing::info!("successfully reloaded dataflow `{dataflow_id}`");

Ok(reply_logs)
}

async fn start_dataflow(
path: &Path,
name: Option<String>,
Expand Down
23 changes: 23 additions & 0 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use std::{
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
Expand Down Expand Up @@ -261,6 +263,27 @@ impl Daemon {
DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
(reply, RunStatus::Continue)
}
DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id,
} => {
// read file
let logs = async {
println!("logs/{node_id}.txt");
let mut file = File::open(format!("logs/{node_id}.txt"))
.await
.wrap_err("Could not open log file")?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await
.expect("Could not retrieve logs");
(DaemonCoordinatorReply::Logs { logs }, RunStatus::Continue)
}
DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
node_id,
Expand Down
54 changes: 49 additions & 5 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ use dora_core::{
use dora_download::download_file;
use eyre::WrapErr;
use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio};
use tokio::sync::mpsc;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt},
sync::mpsc,
};
use tracing::info;

pub async fn spawn_node(
dataflow_id: DataflowId,
Expand Down Expand Up @@ -148,20 +153,59 @@ pub async fn spawn_node(
}
}

command.spawn().wrap_err(format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
))?
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.wrap_err(format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
))?
}
};

tokio::spawn(async move {
// let hlc = HLC::default();
// let timestamp = hlc.new_timestamp().to_string();
// let time = timestamp
// .split('.')
// .next()
// .expect("Could not extract date from timestamp."); // TODO: Add time within log file name

let mut file = File::create(format!("logs/{node_id}.txt"))
.await
.expect("Failed to create log file");

let mut stdout_lines =
(tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")))
.lines();

while let Ok(Some(line)) = stdout_lines.next_line().await {
file.write(line.as_bytes()).await.unwrap();
file.write(b"\n").await.unwrap();
info!(line);
}

let mut stderr_lines =
(tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr")))
.lines();

while let Ok(Some(line)) = stderr_lines.next_line().await {
file.write(line.as_bytes()).await.unwrap();
file.write(b"\n").await.unwrap();
info!(line);
}

file.sync_all().await.unwrap();

let exit_status = NodeExitStatus::from(child.wait().await);
let event = DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
exit_status,
};

let _ = daemon_tx.send(event.into()).await;
});
Ok(())
Expand Down
5 changes: 5 additions & 0 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ pub enum DaemonCoordinatorEvent {
},
Destroy,
Watchdog,
Logs {
dataflow_id: Uuid,
node_id: NodeId,
},
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
Expand All @@ -212,6 +216,7 @@ pub enum DaemonCoordinatorReply {
StopResult(Result<(), String>),
DestroyResult(Result<(), String>),
WatchdogAck,
Logs { logs: Vec<u8> },
}

pub type DataflowId = Uuid;
Expand Down
6 changes: 6 additions & 0 deletions libraries/core/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub enum ControlRequest {
StopByName {
name: String,
},
Logs {
uuid: Option<Uuid>,
name: Option<String>,
node: String,
},
Destroy,
List,
DaemonConnected,
Expand All @@ -50,6 +55,7 @@ pub enum ControlRequestReply {
DataflowList { dataflows: Vec<DataflowId> },
DestroyOk,
DaemonConnected(bool),
Logs { logs: Vec<u8> },
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down

0 comments on commit 186a04c

Please sign in to comment.