Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use peer_addr of incoming daemon connection for registration #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ enum Command {
#[clap(long)]
machine_id: Option<String>,
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0)
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
bind: SocketAddr,
#[clap(long)]
Expand Down
46 changes: 28 additions & 18 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ mod tcp_utils;
pub async fn start(
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(SocketAddr, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let listener = listener::create_listener(bind).await?;
let bound_addr = listener
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?;
.wrap_err("failed to get local addr of listener")?
.port();
let mut tasks = FuturesUnordered::new();

// Setup ctrl-c handler
Expand All @@ -60,7 +61,7 @@ pub async fn start(
tracing::debug!("all spawned tasks finished, exiting..");
Ok(())
};
Ok((bound_addr, future))
Ok((port, future))
}

// Resolve the dataflow name.
Expand Down Expand Up @@ -170,29 +171,38 @@ async fn start_inner(
machine_id,
mut connection,
dora_version: daemon_version,
listen_socket,
listen_port,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Ok(())
} else {
RegisterResult::Err(format!(
Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
let peer_ip = connection
.peer_addr()
.map(|addr| addr.ip())
.map_err(|err| format!("failed to get peer addr of connection: {err}"));
let register_result = version_check.and(peer_ip);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &register_result {
Ok(_) => RegisterResult::Ok,
Err(err) => RegisterResult::Err(err.clone()),
},
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
match (register_result, send_result) {
(Ok(ip), Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
DaemonConnection {
stream: connection,
listen_socket,
listen_socket: (ip, listen_port).into(),
last_heartbeat: Instant::now(),
},
);
Expand All @@ -202,10 +212,10 @@ async fn start_inner(
);
}
}
(RegisterResult::Err(err), _) => {
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
(Ok(_), Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
Expand Down Expand Up @@ -906,7 +916,7 @@ pub enum DaemonEvent {
dora_version: String,
machine_id: String,
connection: TcpStream,
listen_socket: SocketAddr,
listen_port: u16,
},
}

Expand Down
4 changes: 2 additions & 2 deletions binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
listen_socket,
listen_port,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
connection,
listen_socket,
listen_port,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
Expand Down
4 changes: 2 additions & 2 deletions binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct CoordinatorEvent {
pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
Expand All @@ -37,7 +37,7 @@ pub async fn register(
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
listen_port,
},
timestamp: clock.new_timestamp(),
})?;
Expand Down
10 changes: 5 additions & 5 deletions binaries/daemon/src/inter_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,24 @@ pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
) -> eyre::Result<u16> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
}
};
let socket_addr = socket
let listen_port = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;
.wrap_err("failed to get local addr of socket")?
.port();

tracing::debug!("inter-daemon listener starting for machine `{machine_id}` on {socket_addr}");
tokio::spawn(async move {
listener_loop(socket, events_tx).await;
tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`");
});

Ok(socket_addr)
Ok(listen_port)
}

async fn listener_loop(
Expand Down
4 changes: 2 additions & 2 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Daemon {

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
let listen_port =
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
Expand All @@ -97,7 +97,7 @@ impl Daemon {

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(
Expand Down
5 changes: 3 additions & 2 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ async fn main() -> eyre::Result<()> {

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_addr, coordinator) =
let (coordinator_port, coordinator) =
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into());
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());

Expand Down
3 changes: 1 addition & 2 deletions libraries/core/src/coordinator_messages.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::daemon_messages::DataflowId;
use eyre::eyre;
use std::net::SocketAddr;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register {
dora_version: String,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
},
Event {
machine_id: String,
Expand Down