Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check that coordinator, daemon, and node versions match #245

Merged
merged 2 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/rust/node/src/daemon_connection/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl DaemonChannel {
let msg = DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
};
let reply = self
.request(&msg)
Expand Down
50 changes: 32 additions & 18 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,41 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
Event::DaemonConnectError(err) => {
tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
}
Event::Daemon(event) => {
match event {
DaemonEvent::Register {
machine_id,
mut connection,
} => {
let reply = RegisterResult::Ok;
match tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await {
Ok(()) => {
let previous =
daemon_connections.insert(machine_id.clone(), connection);
if let Some(_previous) = previous {
tracing::info!("closing previous connection `{machine_id}` on new register");
}
}
Err(err) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
Event::Daemon(event) => match event {
DaemonEvent::Register {
machine_id,
mut connection,
dora_version: daemon_version,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
} else {
RegisterResult::Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
))
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply, send_result) {
(RegisterResult::Ok, Ok(())) => {
let previous =
daemon_connections.insert(machine_id.clone(), connection);
if let Some(_previous) = previous {
tracing::info!(
"closing previous connection `{machine_id}` on new register"
);
}
}
(RegisterResult::Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
}
}
},
Event::Dataflow { uuid, event } => match event {
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => {
match running_dataflows.entry(uuid) {
Expand Down Expand Up @@ -564,5 +577,6 @@ pub enum DaemonEvent {
Register {
machine_id: String,
connection: TcpStream,
dora_version: String,
},
}
6 changes: 5 additions & 1 deletion binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende

// handle the message and translate it to a DaemonEvent
match message {
coordinator_messages::CoordinatorRequest::Register { machine_id } => {
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
} => {
let event = DaemonEvent::Register {
machine_id,
connection,
dora_version,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
Expand Down
5 changes: 4 additions & 1 deletion binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub async fn register(
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
let register = serde_json::to_vec(&CoordinatorRequest::Register { machine_id })?;
let register = serde_json::to_vec(&CoordinatorRequest::Register {
machine_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
})?;
tcp_send(&mut stream, &register)
.await
.wrap_err("failed to send register request to dora-coordinator")?;
Expand Down
30 changes: 22 additions & 8 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,23 @@ impl Listener {
DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: node_api_version,
} => {
let reply = DaemonReply::Result(Ok(()));
match connection
.send_reply(reply)
let daemon_version = env!("CARGO_PKG_VERSION");
let result = if node_api_version == daemon_version {
Ok(())
} else {
Err(format!(
"version mismatch: node API v{node_api_version} is not compatible \
with daemon v{daemon_version}"
))
};
let send_result = connection
.send_reply(DaemonReply::Result(result.clone()))
.await
.wrap_err("failed to send register reply")
{
Ok(()) => {
.wrap_err("failed to send register reply");
match (result, send_result) {
(Ok(()), Ok(())) => {
let mut listener = Listener {
dataflow_id,
node_id,
Expand All @@ -158,8 +167,13 @@ impl Listener {
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
(Err(err), _) => {
tracing::warn!("failed to register node {dataflow_id}/{node_id}: {err}");
}
(Ok(()), Err(err)) => {
tracing::warn!(
"failed send register reply to node {dataflow_id}/{node_id}: {err:?}"
);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions libraries/core/src/coordinator_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::daemon_messages::DataflowId;
pub enum CoordinatorRequest {
Register {
machine_id: String,
dora_version: String,
},
Event {
machine_id: String,
Expand Down
1 change: 1 addition & 0 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum DaemonRequest {
Register {
dataflow_id: DataflowId,
node_id: NodeId,
dora_version: String,
},
Subscribe,
SendMessage {
Expand Down