Skip to content

Commit

Permalink
Merge pull request #487 from dora-rs/stop-grace-period-superseded
Browse files Browse the repository at this point in the history
Add grace duration and kill process
  • Loading branch information
phil-opp authored Apr 30, 2024
2 parents ede989c + 9f8bb2c commit 7761a61
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 62 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ jobs:
dora list
dora start dataflow.yml --name ci-rust-test
sleep 10
dora stop --name ci-rust-test
dora stop --name ci-rust-test --grace-duration 5s
dora destroy
- name: "Test CLI (Python)"
timeout-minutes: 30
Expand All @@ -292,7 +292,7 @@ jobs:
dora list
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test
dora stop --name ci-python-test --grace-duration 5s
dora destroy
clippy:
Expand Down
35 changes: 31 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ dora-runtime = { workspace = true }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"
duration-str = "0.5"
1 change: 1 addition & 0 deletions binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub fn attach_dataflow(
if ctrlc_tx
.send(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
grace_duration: None,
})
.is_err()
{
Expand Down
44 changes: 32 additions & 12 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
Expand All @@ -17,8 +12,14 @@ use dora_core::{
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use duration_str::parse;
use eyre::{bail, Context};
use std::net::SocketAddr;
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use tokio::runtime::Builder;
use uuid::Uuid;

Expand Down Expand Up @@ -87,6 +88,9 @@ enum Command {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
},
/// List running dataflows.
List,
Expand Down Expand Up @@ -269,13 +273,17 @@ fn run() -> eyre::Result<()> {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop { uuid, name } => {
Command::Stop {
uuid,
name,
grace_duration,
} => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?,
(None, None) => stop_dataflow_interactive(&mut *session)?,
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Expand Down Expand Up @@ -361,26 +369,31 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> {
fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
stop_dataflow(selection.uuid, session)?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}

Ok(())
}

fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
Expand All @@ -398,10 +411,17 @@ fn stop_dataflow(

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap())
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
Expand Down
69 changes: 47 additions & 22 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,35 +388,41 @@ async fn start_inner(
});
let _ = reply_sender.send(reply);
}
ControlRequest::Stop { dataflow_uuid } => {
ControlRequest::Stop {
dataflow_uuid,
grace_duration,
} => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?;
}
ControlRequest::StopByName { name } => {
match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
)
.await?
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
ControlRequest::StopByName {
name,
grace_duration,
} => match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?
}
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
Expand Down Expand Up @@ -548,6 +554,7 @@ async fn stop_dataflow_by_uuid(
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
Expand All @@ -561,7 +568,14 @@ async fn stop_dataflow_by_uuid(
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(dataflow, dataflow_uuid, daemon_connections, timestamp).await?;
stop_dataflow(
dataflow,
dataflow_uuid,
daemon_connections,
timestamp,
grace_duration,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
Expand Down Expand Up @@ -623,7 +637,14 @@ async fn handle_destroy(
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(dataflow, uuid, daemon_connections, clock.new_timestamp()).await?;
stop_dataflow(
dataflow,
uuid,
daemon_connections,
clock.new_timestamp(),
None,
)
.await?;
}
destroy_daemons(daemon_connections, clock.new_timestamp()).await?;
*daemon_events_tx = None;
Expand Down Expand Up @@ -685,9 +706,13 @@ async fn stop_dataflow(
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid },
inner: DaemonCoordinatorEvent::StopDataflow {
dataflow_id: uuid,
grace_duration,
},
timestamp,
})?;

Expand All @@ -712,7 +737,7 @@ async fn stop_dataflow(
other => bail!("unexpected reply after sending stop: {other:?}"),
}
}
tracing::info!("successfully stopped dataflow `{uuid}`");
tracing::info!("successfully send stop dataflow `{uuid}` to all daemons");

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ async-trait = "0.1.64"
aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"
Loading

0 comments on commit 7761a61

Please sign in to comment.