Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to access logs through the CLI #259

Merged
merged 21 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ca182bb
Provide a way to access logs through the CLI
haixuanTao Apr 19, 2023
d925d05
Use temp_dir for log files
haixuanTao Apr 24, 2023
cb39cb0
Improve error message for logs
haixuanTao Apr 24, 2023
3b6b766
Make cli less verbose
haixuanTao Apr 24, 2023
fcea39a
Fix clippy issue about partial file writing
haixuanTao Apr 24, 2023
ca0dcdb
Query node-corresponding daemon for logs
haixuanTao Apr 24, 2023
9707c1c
Make sure that logs are been synced to disk before exiting
haixuanTao Apr 24, 2023
29d7ed7
Refactor spawning of log thread to avoid deadlock
haixuanTao Apr 24, 2023
5573e4b
Pipe custom node stdout
haixuanTao Apr 25, 2023
1960614
archive `RunningDataflow` in order to query logs
haixuanTao Apr 26, 2023
e3079a8
Refactor to multi-daemon branch
haixuanTao Apr 28, 2023
e451a6b
Bail if log node has been found on more than one machine
haixuanTao Apr 28, 2023
0854f91
Make a centralised `log_path` function
haixuanTao May 2, 2023
7f82f32
Make logs not fail in the daemon
haixuanTao May 2, 2023
5c98327
Reply logs in a new thread in order to not block daemon when reading …
haixuanTao May 2, 2023
98f7018
Resolve node name dependant on providing the archived nodes or not
haixuanTao May 2, 2023
b8fbfbd
Print log on std stdout/err if logging thread failed
haixuanTao May 2, 2023
1caf7c7
Make daemon not panic if logs are not written
haixuanTao May 2, 2023
664e8d4
Minor refactoring
haixuanTao May 2, 2023
8740314
Wait for logs to be logged before exiting daemon
haixuanTao May 3, 2023
68a690c
Improve error messages when nodes fails as they are no longer reporte…
haixuanTao May 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
653 changes: 612 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ notify = "5.1.0"
ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
bat = "0.23.0"
45 changes: 45 additions & 0 deletions binaries/cli/src/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context, Result};
use uuid::Uuid;

use bat::{Input, PrettyPrinter};

pub fn logs(
session: &mut TcpRequestReplyConnection,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> Result<()> {
let logs = {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Logs {
uuid,
name,
node: node.clone(),
})
.wrap_err("")?,
)
.wrap_err("failed to send Logs request message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::Logs(logs) => logs,
other => bail!("unexpected reply to daemon logs: {other:?}"),
}
};

PrettyPrinter::new()
.header(true)
.grid(true)
.line_numbers(true)
.paging_mode(bat::PagingMode::QuitIfOneScreen)
.inputs(vec![Input::from_bytes(&logs)
.name("Logs")
.title(format!("Logs from {node}.").as_str())])
.print()
.wrap_err("Something went wrong with viewing log file")?;

Ok(())
}
17 changes: 15 additions & 2 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod attach;
mod build;
mod check;
mod graph;
mod logs;
mod template;
mod up;

Expand Down Expand Up @@ -43,7 +44,9 @@ enum Command {
open: bool,
},
/// Run build commands provided in the given dataflow.
Build { dataflow: PathBuf },
Build {
dataflow: PathBuf,
},
/// Generate a new project, node or operator. Choose the language between Rust, Python, C or C++.
New {
#[clap(flatten)]
Expand Down Expand Up @@ -85,7 +88,10 @@ enum Command {
List,
// Planned for future releases:
// Dashboard,
// Logs,
Logs {
dataflow: String,
node: String,
},
// Metrics,
// Stats,
// Get,
Expand Down Expand Up @@ -166,6 +172,13 @@ fn run() -> eyre::Result<()> {
coordinator_path.as_deref(),
daemon_path.as_deref(),
)?,
Command::Logs { dataflow, node } => {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
logs::logs(&mut *session, uuid, name, node)?
}
Command::Start {
dataflow,
name,
Expand Down
165 changes: 148 additions & 17 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dora_core::{
config::{NodeId, OperatorId},
coordinator_messages::RegisterResult,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply},
descriptor::Descriptor,
descriptor::{Descriptor, ResolvedNode},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
Expand Down Expand Up @@ -79,6 +79,46 @@ pub async fn start(
Ok((port, future))
}

// Resolve the dataflow name.
// Search for archived dataflows if they are provided.
fn resolve_name(
name: String,
running_dataflows: &HashMap<Uuid, RunningDataflow>,
archived_dataflows: Option<&HashMap<Uuid, ArchivedDataflow>>,
) -> eyre::Result<Uuid> {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect();
let archived_uuids: Vec<_> = if let Some(archived_dataflows) = archived_dataflows {
archived_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect()
} else {
vec![]
};

if uuids.is_empty() {
if archived_uuids.is_empty() {
bail!("no dataflow with name `{name}`");
} else if let [uuid] = archived_uuids.as_slice() {
Ok(*uuid)
} else {
// TOOD: Index the archived dataflows in order to return logs based on the index.
bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead.");
}
} else if let [uuid] = uuids.as_slice() {
Ok(*uuid)
} else {
bail!("multiple dataflows found with name `{name}`");
}
}

async fn start_inner(
listener: TcpListener,
tasks: &FuturesUnordered<JoinHandle<()>>,
Expand Down Expand Up @@ -116,6 +156,7 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();

while let Some(event) = events.next().await {
Expand Down Expand Up @@ -217,6 +258,11 @@ async fn start_inner(
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
// Archive finished dataflow
if archived_dataflows.get(&uuid).is_none() {
archived_dataflows
.insert(uuid, ArchivedDataflow::from(entry.get()));
}
entry.get_mut().machines.remove(&machine_id);
match result {
Ok(()) => {
Expand Down Expand Up @@ -325,20 +371,7 @@ async fn start_inner(
}
ControlRequest::StopByName { name } => {
let stop = async {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect();
let dataflow_uuid = if uuids.is_empty() {
bail!("no running dataflow with name `{name}`");
} else if let [uuid] = uuids.as_slice() {
*uuid
} else {
bail!("multiple dataflows found with name `{name}`");
};

let dataflow_uuid = resolve_name(name, &running_dataflows, None)?;
stop_dataflow(
&running_dataflows,
dataflow_uuid,
Expand All @@ -350,6 +383,25 @@ async fn start_inner(
stop.await
.map(|uuid| ControlRequestReply::DataflowStopped { uuid })
}
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
} else if let Some(name) = name {
resolve_name(name, &running_dataflows, Some(&archived_dataflows))?
} else {
bail!("No uuid")
};

retrieve_logs(
&running_dataflows,
&archived_dataflows,
dataflow_uuid,
node.into(),
&mut daemon_connections,
)
.await
.map(|logs| ControlRequestReply::Logs(logs))
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");

Expand Down Expand Up @@ -498,6 +550,21 @@ struct RunningDataflow {
machines: BTreeSet<String>,
/// IDs of machines that are waiting until all nodes are started.
pending_machines: BTreeSet<String>,
nodes: Vec<ResolvedNode>,
}

struct ArchivedDataflow {
name: Option<String>,
nodes: Vec<ResolvedNode>,
}

impl From<&RunningDataflow> for ArchivedDataflow {
fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
ArchivedDataflow {
name: dataflow.name.clone(),
nodes: dataflow.nodes.clone(),
}
}
}

impl PartialEq for RunningDataflow {
Expand Down Expand Up @@ -586,14 +653,77 @@ async fn reload_dataflow(
Ok(())
}

async fn retrieve_logs(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &mut HashMap<String, DaemonConnection>,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
} else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
} else {
bail!("No dataflow found with UUID `{dataflow_id}`")
};

let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
})?;

let machine_ids: Vec<String> = nodes
.iter()
.filter(|node| node.id == node_id)
.map(|node| node.deploy.machine.clone())
.collect();
haixuanTao marked this conversation as resolved.
Show resolved Hide resolved

let machine_id = if let [machine_id] = &machine_ids[..] {
machine_id
} else if machine_ids.is_empty() {
bail!("No machine contains {}/{}", dataflow_id, node_id)
} else {
bail!(
"More than one machine contains {}/{}. However, it should only be present on one.",
dataflow_id,
node_id
)
};

let daemon_connection = daemon_connections
.get_mut(machine_id.as_str())
.wrap_err("no daemon connection")?;
tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err("failed to send logs message to daemon")?;

// wait for reply
let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to retrieve logs reply from daemon")?;
let reply_logs = match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize logs reply from daemon")?
{
DaemonCoordinatorReply::Logs(logs) => logs,
other => bail!("unexpected reply after sending logs: {other:?}"),
};
tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");

reply_logs.map_err(|err| eyre!(err))
}

async fn start_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow { uuid, machines } =
spawn_dataflow(dataflow, working_dir, daemon_connections).await?;
let SpawnedDataflow {
uuid,
machines,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections).await?;
Ok(RunningDataflow {
uuid,
name,
Expand All @@ -603,6 +733,7 @@ async fn start_dataflow(
BTreeSet::new()
},
machines,
nodes,
})
}

Expand Down
11 changes: 8 additions & 3 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{

use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes},
descriptor::Descriptor,
descriptor::{Descriptor, ResolvedNode},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
Expand Down Expand Up @@ -39,7 +39,7 @@ pub(super) async fn spawn_dataflow(
let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
nodes,
nodes: nodes.clone(),
communication: dataflow.communication,
machine_listen_ports,
};
Expand All @@ -54,7 +54,11 @@ pub(super) async fn spawn_dataflow(

tracing::info!("successfully spawned dataflow `{uuid}`");

Ok(SpawnedDataflow { uuid, machines })
Ok(SpawnedDataflow {
uuid,
machines,
nodes,
})
}

async fn spawn_dataflow_on_machine(
Expand Down Expand Up @@ -85,4 +89,5 @@ async fn spawn_dataflow_on_machine(
pub struct SpawnedDataflow {
pub uuid: Uuid,
pub machines: BTreeSet<String>,
pub nodes: Vec<ResolvedNode>,
}
Loading