From ad0d585758ac75b2f8ce1b9e442fc345a8a8261f Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 4 Apr 2023 14:35:43 +0200 Subject: [PATCH 1/2] Check that dora daemon and coordinator versions match --- binaries/coordinator/src/lib.rs | 50 ++++++++++++++-------- binaries/coordinator/src/listener.rs | 6 ++- binaries/daemon/src/coordinator.rs | 5 ++- libraries/core/src/coordinator_messages.rs | 1 + 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e4809721..4a11e003 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -105,28 +105,41 @@ async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { Event::DaemonConnectError(err) => { tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon")); } - Event::Daemon(event) => { - match event { - DaemonEvent::Register { - machine_id, - mut connection, - } => { - let reply = RegisterResult::Ok; - match tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await { - Ok(()) => { - let previous = - daemon_connections.insert(machine_id.clone(), connection); - if let Some(_previous) = previous { - tracing::info!("closing previous connection `{machine_id}` on new register"); - } - } - Err(err) => { - tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); + Event::Daemon(event) => match event { + DaemonEvent::Register { + machine_id, + mut connection, + dora_version: daemon_version, + } => { + let coordinator_version = &env!("CARGO_PKG_VERSION"); + let reply = if &daemon_version == coordinator_version { + RegisterResult::Ok + } else { + RegisterResult::Err(format!( + "version mismatch: daemon v{daemon_version} is \ + not compatible with coordinator v{coordinator_version}" + )) + }; + let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await; + match (reply, send_result) { + (RegisterResult::Ok, Ok(())) => { + let previous = + daemon_connections.insert(machine_id.clone(), connection); + if let Some(_previous) = previous { + tracing::info!( + "closing previous connection `{machine_id}` on new register" + ); } } + (RegisterResult::Err(err), _) => { + tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); + } + (RegisterResult::Ok, Err(err)) => { + tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); + } } } - } + }, Event::Dataflow { uuid, event } => match event { DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => { match running_dataflows.entry(uuid) { @@ -564,5 +577,6 @@ pub enum DaemonEvent { Register { machine_id: String, connection: TcpStream, + dora_version: String, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 15c11ad7..01e91404 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -45,10 +45,14 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende // handle the message and translate it to a DaemonEvent match message { - coordinator_messages::CoordinatorRequest::Register { machine_id } => { + coordinator_messages::CoordinatorRequest::Register { + machine_id, + dora_version, + } => { let event = DaemonEvent::Register { machine_id, connection, + dora_version, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 69973582..25180669 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -30,7 +30,10 @@ pub async fn register( stream .set_nodelay(true) .wrap_err("failed to set TCP_NODELAY")?; - let register = serde_json::to_vec(&CoordinatorRequest::Register { machine_id })?; + let register = serde_json::to_vec(&CoordinatorRequest::Register { + machine_id, + dora_version: env!("CARGO_PKG_VERSION").to_owned(), + })?; tcp_send(&mut stream, ®ister) .await .wrap_err("failed to send register request to dora-coordinator")?; diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 382fe757..32494bbd 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -6,6 +6,7 @@ use crate::daemon_messages::DataflowId; pub enum CoordinatorRequest { Register { machine_id: String, + dora_version: String, }, Event { machine_id: String, From 06e020472b45175a1255e8aefc3afde4feaaa453 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 4 Apr 2023 15:01:02 +0200 Subject: [PATCH 2/2] Check that dora daemon and node API versions match --- .../daemon_connection/communication/mod.rs | 1 + binaries/daemon/src/listener/mod.rs | 30 ++++++++++++++----- libraries/core/src/daemon_messages.rs | 1 + 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/apis/rust/node/src/daemon_connection/communication/mod.rs b/apis/rust/node/src/daemon_connection/communication/mod.rs index 2a380aae..c7f7cf01 100644 --- a/apis/rust/node/src/daemon_connection/communication/mod.rs +++ b/apis/rust/node/src/daemon_connection/communication/mod.rs @@ -37,6 +37,7 @@ impl DaemonChannel { let msg = DaemonRequest::Register { dataflow_id, node_id, + dora_version: env!("CARGO_PKG_VERSION").to_owned(), }; let reply = self .request(&msg) diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 4e8a2212..124f7cb4 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -133,14 +133,23 @@ impl Listener { DaemonRequest::Register { dataflow_id, node_id, + dora_version: node_api_version, } => { - let reply = DaemonReply::Result(Ok(())); - match connection - .send_reply(reply) + let daemon_version = env!("CARGO_PKG_VERSION"); + let result = if node_api_version == daemon_version { + Ok(()) + } else { + Err(format!( + "version mismatch: node API v{node_api_version} is not compatible \ + with daemon v{daemon_version}" + )) + }; + let send_result = connection + .send_reply(DaemonReply::Result(result.clone())) .await - .wrap_err("failed to send register reply") - { - Ok(()) => { + .wrap_err("failed to send register reply"); + match (result, send_result) { + (Ok(()), Ok(())) => { let mut listener = Listener { dataflow_id, node_id, @@ -158,8 +167,13 @@ impl Listener { Err(err) => tracing::error!("{err:?}"), } } - Err(err) => { - tracing::warn!("{err:?}"); + (Err(err), _) => { + tracing::warn!("failed to register node {dataflow_id}/{node_id}: {err}"); + } + (Ok(()), Err(err)) => { + tracing::warn!( + "failed send register reply to node {dataflow_id}/{node_id}: {err:?}" + ); } } } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index fa36e4e1..b890098f 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -37,6 +37,7 @@ pub enum DaemonRequest { Register { dataflow_id: DataflowId, node_id: NodeId, + dora_version: String, }, Subscribe, SendMessage {