Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grace duration and kill process #487

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ jobs:
dora list
dora start dataflow.yml --name ci-rust-test
sleep 10
dora stop --name ci-rust-test
dora stop --name ci-rust-test --grace-duration 5s
dora destroy
- name: "Test CLI (Python)"
timeout-minutes: 30
Expand All @@ -292,7 +292,7 @@ jobs:
dora list
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test
dora stop --name ci-python-test --grace-duration 5s
dora destroy

clippy:
Expand Down
35 changes: 31 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ dora-runtime = { workspace = true }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"
duration-str = "0.5"
1 change: 1 addition & 0 deletions binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub fn attach_dataflow(
if ctrlc_tx
.send(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
grace_duration: None,
})
.is_err()
{
Expand Down
44 changes: 32 additions & 12 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
Expand All @@ -17,8 +12,14 @@ use dora_core::{
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use duration_str::parse;
use eyre::{bail, Context};
use std::net::SocketAddr;
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use tokio::runtime::Builder;
use uuid::Uuid;

Expand Down Expand Up @@ -87,6 +88,9 @@ enum Command {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
},
/// List running dataflows.
List,
Expand Down Expand Up @@ -269,13 +273,17 @@ fn run() -> eyre::Result<()> {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop { uuid, name } => {
Command::Stop {
uuid,
name,
grace_duration,
} => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?,
(None, None) => stop_dataflow_interactive(&mut *session)?,
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Expand Down Expand Up @@ -361,26 +369,31 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> {
fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
stop_dataflow(selection.uuid, session)?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}

Ok(())
}

fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
Expand All @@ -398,10 +411,17 @@ fn stop_dataflow(

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap())
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
Expand Down
69 changes: 47 additions & 22 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,35 +388,41 @@ async fn start_inner(
});
let _ = reply_sender.send(reply);
}
ControlRequest::Stop { dataflow_uuid } => {
ControlRequest::Stop {
dataflow_uuid,
grace_duration,
} => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?;
}
ControlRequest::StopByName { name } => {
match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
)
.await?
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
ControlRequest::StopByName {
name,
grace_duration,
} => match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?
}
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
Expand Down Expand Up @@ -548,6 +554,7 @@ async fn stop_dataflow_by_uuid(
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
Expand All @@ -561,7 +568,14 @@ async fn stop_dataflow_by_uuid(
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(dataflow, dataflow_uuid, daemon_connections, timestamp).await?;
stop_dataflow(
dataflow,
dataflow_uuid,
daemon_connections,
timestamp,
grace_duration,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
Expand Down Expand Up @@ -623,7 +637,14 @@ async fn handle_destroy(
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(dataflow, uuid, daemon_connections, clock.new_timestamp()).await?;
stop_dataflow(
dataflow,
uuid,
daemon_connections,
clock.new_timestamp(),
None,
)
.await?;
}
destroy_daemons(daemon_connections, clock.new_timestamp()).await?;
*daemon_events_tx = None;
Expand Down Expand Up @@ -685,9 +706,13 @@ async fn stop_dataflow(
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid },
inner: DaemonCoordinatorEvent::StopDataflow {
dataflow_id: uuid,
grace_duration,
},
timestamp,
})?;

Expand All @@ -712,7 +737,7 @@ async fn stop_dataflow(
other => bail!("unexpected reply after sending stop: {other:?}"),
}
}
tracing::info!("successfully stopped dataflow `{uuid}`");
tracing::info!("successfully send stop dataflow `{uuid}` to all daemons");

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ async-trait = "0.1.64"
aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"
Loading
Loading