Skip to content

Commit

Permalink
Merge pull request #245 from dora-rs/check-version
Browse files Browse the repository at this point in the history
Check that coordinator, daemon, and node versions match
  • Loading branch information
phil-opp authored Apr 5, 2023
2 parents 1f7b8d8 + 06e0204 commit 9859a2f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 28 deletions.
1 change: 1 addition & 0 deletions apis/rust/node/src/daemon_connection/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl DaemonChannel {
let msg = DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
};
let reply = self
.request(&msg)
Expand Down
50 changes: 32 additions & 18 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,41 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
Event::DaemonConnectError(err) => {
tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
}
Event::Daemon(event) => {
match event {
DaemonEvent::Register {
machine_id,
mut connection,
} => {
let reply = RegisterResult::Ok;
match tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await {
Ok(()) => {
let previous =
daemon_connections.insert(machine_id.clone(), connection);
if let Some(_previous) = previous {
tracing::info!("closing previous connection `{machine_id}` on new register");
}
}
Err(err) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
Event::Daemon(event) => match event {
DaemonEvent::Register {
machine_id,
mut connection,
dora_version: daemon_version,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
} else {
RegisterResult::Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
))
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply, send_result) {
(RegisterResult::Ok, Ok(())) => {
let previous =
daemon_connections.insert(machine_id.clone(), connection);
if let Some(_previous) = previous {
tracing::info!(
"closing previous connection `{machine_id}` on new register"
);
}
}
(RegisterResult::Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
}
}
},
Event::Dataflow { uuid, event } => match event {
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => {
match running_dataflows.entry(uuid) {
Expand Down Expand Up @@ -564,5 +577,6 @@ pub enum DaemonEvent {
Register {
machine_id: String,
connection: TcpStream,
dora_version: String,
},
}
6 changes: 5 additions & 1 deletion binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende

// handle the message and translate it to a DaemonEvent
match message {
coordinator_messages::CoordinatorRequest::Register { machine_id } => {
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
} => {
let event = DaemonEvent::Register {
machine_id,
connection,
dora_version,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
Expand Down
5 changes: 4 additions & 1 deletion binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub async fn register(
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
let register = serde_json::to_vec(&CoordinatorRequest::Register { machine_id })?;
let register = serde_json::to_vec(&CoordinatorRequest::Register {
machine_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
})?;
tcp_send(&mut stream, &register)
.await
.wrap_err("failed to send register request to dora-coordinator")?;
Expand Down
30 changes: 22 additions & 8 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,23 @@ impl Listener {
DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: node_api_version,
} => {
let reply = DaemonReply::Result(Ok(()));
match connection
.send_reply(reply)
let daemon_version = env!("CARGO_PKG_VERSION");
let result = if node_api_version == daemon_version {
Ok(())
} else {
Err(format!(
"version mismatch: node API v{node_api_version} is not compatible \
with daemon v{daemon_version}"
))
};
let send_result = connection
.send_reply(DaemonReply::Result(result.clone()))
.await
.wrap_err("failed to send register reply")
{
Ok(()) => {
.wrap_err("failed to send register reply");
match (result, send_result) {
(Ok(()), Ok(())) => {
let mut listener = Listener {
dataflow_id,
node_id,
Expand All @@ -158,8 +167,13 @@ impl Listener {
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
(Err(err), _) => {
tracing::warn!("failed to register node {dataflow_id}/{node_id}: {err}");
}
(Ok(()), Err(err)) => {
tracing::warn!(
"failed send register reply to node {dataflow_id}/{node_id}: {err:?}"
);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions libraries/core/src/coordinator_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::daemon_messages::DataflowId;
pub enum CoordinatorRequest {
Register {
machine_id: String,
dora_version: String,
},
Event {
machine_id: String,
Expand Down
1 change: 1 addition & 0 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum DaemonRequest {
Register {
dataflow_id: DataflowId,
node_id: NodeId,
dora_version: String,
},
Subscribe,
SendMessage {
Expand Down

0 comments on commit 9859a2f

Please sign in to comment.