Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI: Improve error messages when coordinator is not running #254

Merged
merged 4 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::control_connection;

pub fn attach_dataflow(
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
hot_reload: bool,
) -> Result<(), eyre::ErrReport> {
let (tx, rx) = mpsc::sync_channel(2);
Expand Down Expand Up @@ -70,7 +68,7 @@ pub fn attach_dataflow(
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: dataflow_id.clone(),
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
Expand Down Expand Up @@ -123,7 +121,7 @@ pub fn attach_dataflow(
Ok(reload_event) => reload_event,
};

let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =
Expand Down
64 changes: 31 additions & 33 deletions binaries/cli/src/check.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::control_connection;
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::io::Write;
Expand All @@ -16,19 +17,30 @@ pub fn check_environment() -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
if coordinator_running()? {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
} else {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
writeln!(stdout, "not running")?;
error_occured = true;
}
let mut session = match connect_to_coordinator() {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
Some(session)
}
Err(_) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
haixuanTao marked this conversation as resolved.
Show resolved Hide resolved
writeln!(stdout, "not running")?;
error_occured = true;
None
}
};

let _ = stdout.reset();

// check whether daemon is running
write!(stdout, "Dora Daemon: ")?;
if daemon_running()? {
if session
.as_deref_mut()
.map(daemon_running)
.transpose()?
.unwrap_or(false)
{
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
} else {
Expand All @@ -47,30 +59,16 @@ pub fn check_environment() -> eyre::Result<()> {
Ok(())
}

pub fn coordinator_running() -> Result<bool, eyre::ErrReport> {
let mut control_session = None;
let connected = control_connection(&mut control_session).is_ok();
Ok(connected)
}

pub fn daemon_running() -> Result<bool, eyre::ErrReport> {
let mut control_session = None;
let running = match control_connection(&mut control_session) {
Ok(connection) => {
let reply_raw = connection
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;
pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result<bool, eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
}
}
Err(_) => {
// coordinator is not running
false
}
let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

Ok(running)
}
69 changes: 39 additions & 30 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,18 @@ enum Lang {
Cxx,
}

fn main() -> eyre::Result<()> {
fn main() {
if let Err(err) = run() {
eprintln!("{err:#}");
std::process::exit(1);
}
}

fn run() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
let args = Args::parse();

let mut session = None;

match args.command {
Command::Check {
dataflow,
Expand Down Expand Up @@ -171,25 +176,36 @@ fn main() -> eyre::Result<()> {
dataflow_description
.check(&dataflow, None)
.wrap_err("Could not validate yaml")?;
let dataflow_id = start_dataflow(dataflow.clone(), name, &mut session)?;
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(dataflow.clone(), name, &mut *session)?;

if attach {
attach_dataflow(
dataflow_description,
dataflow,
dataflow_id,
&mut session,
&mut *session,
hot_reload,
)?
}
}
Command::List => list(&mut session)?,
Command::Stop { uuid, name } => match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut session)?,
(None, None) => stop_dataflow_interactive(&mut session)?,
Command::List => match connect_to_coordinator() {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
},
Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?,
Command::Stop { uuid, name } => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?,
(None, None) => stop_dataflow_interactive(&mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
}

Ok(())
Expand All @@ -198,12 +214,12 @@ fn main() -> eyre::Result<()> {
fn start_dataflow(
dataflow: PathBuf,
name: Option<String>,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let canonicalized = dataflow
.canonicalize()
.wrap_err("given dataflow file does not exist")?;
let reply_raw = control_connection(session)?
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow_path: canonicalized,
Expand All @@ -225,9 +241,7 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> eyre::Result<()> {
fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
Expand All @@ -241,9 +255,9 @@ fn stop_dataflow_interactive(

fn stop_dataflow(
uuid: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
Expand All @@ -262,9 +276,9 @@ fn stop_dataflow(

fn stop_dataflow_by_name(
name: String,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap())
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
Expand All @@ -276,7 +290,7 @@ fn stop_dataflow_by_name(
}
}

fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre::ErrReport> {
fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let ids = query_running_dataflows(session)?;

if ids.is_empty() {
Expand All @@ -292,9 +306,9 @@ fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre
}

fn query_running_dataflows(
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<Vec<DataflowId>, eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
Expand All @@ -308,11 +322,6 @@ fn query_running_dataflows(
Ok(ids)
}

fn control_connection(
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> eyre::Result<&mut Box<TcpRequestReplyConnection>> {
Ok(match session {
Some(session) => session,
None => session.insert(TcpLayer::new().connect(control_socket_addr())?),
})
fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
}
54 changes: 30 additions & 24 deletions binaries/cli/src/up.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use crate::{
check::{coordinator_running, daemon_running},
control_connection,
};
use communication_layer_request_reply::TcpRequestReplyConnection;
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use eyre::Context;
use std::{fs, path::Path, process::Command, time::Duration};
Expand All @@ -17,34 +13,44 @@ pub(crate) fn up(
) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;

if !coordinator_running()? {
start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?;
// sleep a bit until the coordinator accepts connections
while !coordinator_running()? {
std::thread::sleep(Duration::from_millis(50));
let mut session = match connect_to_coordinator() {
Ok(session) => session,
Err(_) => {
start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of spawning a coordinator if we failed to connect to the coordinator. What if the dora-coordinator should be on a different machine?

I think its ok to fail the request if the coordinator is not reachable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only do this for the dora up command, which I imagine will be mostly used for local testing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I thought it was on all connections sorry!

I still feel like if there is a need to run up two times to get the dora-coordinator up, it should not be a feature and there should be an investigation. What if the first spawning worked but we did not reach it and end up somehow with two coordinator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the "failed to start dora-coordinator" error?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread the file, the first time, and I was spooked by the spawn on Err(_), but I misunderstood it at a failure of spawning a first time.

But, I still think that we should not start_coordinator on failing to connect to the coordinator. Let say there is an internet connection issue and we fail to connect to the coordinator. That will makes us try to spawn a new coordinator.

I think that the design pattern Err(_) => Spawn might create even more error. I would prefer to try spawning the coordinator without trying to connect to the coordinator and raise an error on the port being already taken or an error of the kind.

This seems to be more natural and more common when spawning servers.

But open for discussion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to try spawning the coordinator without trying to connect to the coordinator and raise an error on the port being already taken or an error of the kind.

I'm generally fine with this approach as well, but unfortunately this would be a breaking change. Right now, you can run dora up multiple times without errors. So I think it's best to leave the behavior as is for now and revisit this once we're planning version 0.3.0. If you like we can create some sort of tracking issue for the 0.3.0 release so that we don't forget about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that the same applies to the dora check command. Right now, the command assumes a local setup and performs the same logic for checking the coordinator and daemon setup. So we might want to redesign this command too once we have support for distributed deployments (see #256).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true, that it is a breaking change. Make sense to keep it as it is then


loop {
match connect_to_coordinator() {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
}
if !daemon_running()? {
};

if !daemon_running(&mut *session)? {
start_daemon(daemon).wrap_err("failed to start dora-daemon")?;
}

Ok(())
}

pub(crate) fn destroy(
config_path: Option<&Path>,
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> Result<(), eyre::ErrReport> {
pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;

if coordinator_running()? {
// send destroy command to dora-coordinator
control_connection(session)?
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
} else {
eprintln!("The dora-coordinator is not running");
match connect_to_coordinator() {
Ok(mut session) => {
// send destroy command to dora-coordinator
session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())
Expand Down