Skip to content

Commit

Permalink
add logs for connection states
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Nov 9, 2023
1 parent 8d4e00b commit 763ef04
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 8 deletions.
11 changes: 8 additions & 3 deletions interactive_engine/executor/engine/pegasus/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,12 @@ pub fn ipc_channel_recv<T: Codec + 'static>(
}

pub fn check_ipc_ready(local: u64, remotes: &[u64]) -> bool {
crate::state::check_connect(local, remotes)
&& crate::send::check_remotes_send_ready(local, remotes)
&& crate::receive::check_remotes_read_ready(local, remotes)
let f1 = crate::state::check_connect(local, remotes);
let f2 = crate::send::check_remotes_send_ready(local, remotes);
let f3 = crate::receive::check_remotes_read_ready(local, remotes);
let ret = f1 && f2 && f3;
if !ret {
warn!("IPC not ready {}, {}, {}", f1, f2, f3);
}
ret
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub fn check_remotes_read_ready(local: u64, remotes: &[u64]) -> bool {
for id in remotes.iter() {
if *id != local {
if !lock.contains_key(&(local, *id)) {
warn!("remote {} is not ready.", *id);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Inbox {
self.buffer.push(msg);
} else {
if let Err(_) = unsafe { (*tx).send(msg) } {
error!("Inbox#push: send data failure;");
error!("Channel {}, Inbox#push: send data failure;", self.channel_id);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub fn check_remotes_send_ready(local: u64, remotes: &[u64]) -> bool {
for id in remotes {
if *id != local {
if !lock.contains_key(&(local, *id)) {
warn!("remote {} is not ready.", *id);
return false;
}
}
Expand Down Expand Up @@ -216,6 +217,7 @@ pub(crate) fn start_net_sender(
.name(format!("net-sender-{}", remote.id))
.spawn(move || {
busy_send(&mut net_tx, is_block, timeout, local_id, remote.id, recv_poisoned);
error!("Connection to server {} lost", remote.id);
disconnected.store(true, Ordering::SeqCst);
net_tx
.take_writer()
Expand All @@ -233,6 +235,7 @@ pub(crate) fn start_net_sender(
.name(format!("net-sender-{}", remote.id))
.spawn(move || {
busy_send(&mut net_tx, is_block, timeout, local_id, remote.id, recv_poisoned);
error!("Connection to server {} lost", remote.id);
disconnected.store(true, Ordering::SeqCst);
net_tx
.take_writer()
Expand Down
10 changes: 8 additions & 2 deletions interactive_engine/executor/engine/pegasus/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,21 @@ pub fn is_connected(local_id: u64, remote_id: u64) -> bool {

pub fn check_connect(local: u64, remotes: &[u64]) -> bool {
let states = CONNECTION_STATES.read().expect("lock poisoned");
let mut connect_status = true;
let mut disconnected_servers = vec![];
for id in remotes {
if *id != local
&& !states
.get(&(local, *id))
.map(|s| s.is_connected())
.unwrap_or(false)
{
return false;
connect_status = false;
disconnected_servers.push(*id);
}
}
true
if !connect_status {
error!("Servers {:?} are not connected", disconnected_servers);
}
connect_status
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub fn connect<A: ToSocketAddrs>(
debug!("connect to server {:?};", addr);
let hb_sec = params.get_hb_interval_sec();
super::setup_connection(local_id, hb_sec, &mut conn)?;
debug!("setup connection to {:?} success;", addr);
info!("setup connection to {:?} success;", addr);
if let Some((id, hb_sec)) = super::check_connection(&mut conn)? {
if id == remote_id {
info!("connect server {} on {:?} success;", remote_id, addr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub fn wait_servers_ready(server_conf: &ServerConf) {
};
if !remotes.is_empty() {
while !pegasus_network::check_ipc_ready(local, &remotes) {
std::thread::sleep(std::time::Duration::from_millis(100));
std::thread::sleep(std::time::Duration::from_millis(1000));
info!("waiting remote servers connect ...");
}
}
Expand Down

0 comments on commit 763ef04

Please sign in to comment.