Skip to content

Commit

Permalink
session_with_localhost() rides again
Browse files Browse the repository at this point in the history
  • Loading branch information
NotGyro committed Oct 5, 2024
1 parent 61e3196 commit 639d864
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 32 deletions.
34 changes: 17 additions & 17 deletions gestalt-core/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use log::{error, info, trace};
use tokio::sync::broadcast::error::TryRecvError as BroadcastTryRecvError;
use tokio::sync::mpsc::error::TryRecvError as MpscTryRecvError;
use tokio::sync::{broadcast, mpsc};
use tokio::time::Instant;

use crate::world::WorldId;

Expand Down Expand Up @@ -995,28 +996,27 @@ pub async fn quit_game(deadline: Duration) -> Result<(), SendError> {
num_receivers
);

let mut timeout_future = Box::pin(tokio::time::sleep(deadline));

let mut count_received = 0;

let start = Instant::now();
let timeout_deadline = start + deadline;

while count_received < num_receivers {
tokio::select! {
reply_maybe = ready_receiver.recv_wait() => {
match reply_maybe {
Ok(_) => {
trace!("Received {} quit ready notifications.", count_received);
count_received += 1;
}
Err(e) => {
error!("Error polling for READY_FOR_QUIT messages, exiting immediately. Error was: {:?}", e);
return Ok(());
}
match tokio::time::timeout_at(timeout_deadline,ready_receiver.recv_wait()).await {
Ok(reply_maybe) => match reply_maybe {
Ok(_) => {
trace!("Received {} quit ready notifications.", count_received);
count_received += 1;
}
}
_ = (&mut timeout_future) => {
error!("Waiting for disparate parts of the engine to be ready for quit took longer than {:?}, exiting immediately.", deadline);
Err(e) => {
error!("Error polling for READY_FOR_QUIT messages, exiting immediately. Error was: {:?}", e);
return Ok(());
}
},
Err(_e) => {
error!("Waiting for disparate parts of the engine to be ready for quit took longer than {timeout_deadline:?}, exiting immediately.");
return Ok(());
}
},
}
}

Expand Down
2 changes: 0 additions & 2 deletions gestalt-core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ fn main() {
connect_receiver.recv_wait().await
}).unwrap();

std::thread::sleep(Duration::from_millis(50));

let mut peer_joins_notif = channels.net_channels.net_msg_inbound.receiver_typed::<JoinAnnounce>().unwrap();

async_runtime.spawn(async move {
Expand Down
28 changes: 21 additions & 7 deletions gestalt-core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,12 @@ use crate::SubsetBuilder;
}

#[tokio::test]
#[ignore] //Ignored until cause of GH Actions test flakiness can be ascertained.
//#[ignore] //Ignored until cause of GH Actions test flakiness can be ascertained.
async fn session_with_localhost() {
// Init stuff
let mutex_guard = NET_TEST_MUTEX.lock().await;
let _log = TermLogger::init(
LevelFilter::Trace,
LevelFilter::Info,
simplelog::Config::default(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
Expand All @@ -560,19 +560,24 @@ use crate::SubsetBuilder;
tokio::spawn(approver_no_mismatch(client_channel_set.key_mismatch_reporter.receiver_subscribe(), client_channel_set.key_mismatch_approver.sender_subscribe()));

// Port/binding stuff.
let start_find_port = tokio::time::Instant::now();
let port = find_available_udp_port(54134..54534).await.unwrap();
info!("Binding on port {}", port);
info!("Finding a port took {:?}", start_find_port.elapsed());

let server_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
let server_socket_addr = SocketAddr::new(server_addr, port);

let start_netmsgs = tokio::time::Instant::now();
let test_table = tokio::task::spawn_blocking(|| generated::get_netmsg_table())
.await
.unwrap();
println!("Counted {} registered NetMsg types.", test_table.len());
info!("Building a netmsg table took {:?}", start_netmsgs.elapsed());

//Actually start doing the test here:
//Launch server
let server_start = tokio::time::Instant::now();
let subset = server_channel_set.build_subset(SubsetBuilder::new(())).unwrap();
let join_handle_s = tokio::spawn(async move {
let mut sys = NetworkSystem::new(
Expand Down Expand Up @@ -621,9 +626,15 @@ use crate::SubsetBuilder;
)
.await
.unwrap();

info!("Starting a client and a server, and connecting the client to the server, took {:?}", server_start.elapsed());
let recv_connected = tokio::time::Instant::now();
let connected_peer = connected_to_client.recv_wait().await.unwrap();
assert!(connected_peer.peer_identity == server_key_pair.public);
info!("Waiting for the server to notify the client that we're connected took {:?}", recv_connected.elapsed());

info!("Client connected to peer {:?} with role, {:?}", &connected_peer.peer_identity, &connected_peer.peer_role);

let post_handshake = tokio::time::Instant::now();

let client_net_send = client_channel_set.net_msg_outbound.sender_subscribe_domain(&connected_peer.peer_identity).unwrap();
client_net_send.send(
Expand All @@ -632,7 +643,8 @@ use crate::SubsetBuilder;
}.construct_packet().unwrap()
).unwrap();

let mut test_receiver = client_channel_set.net_msg_inbound.receiver_typed::<TestNetMsg>().unwrap();
let mut server_test_receiver = server_channel_set.net_msg_inbound.receiver_typed::<TestNetMsg>().unwrap();
let mut client_test_receiver = client_channel_set.net_msg_inbound.receiver_typed::<TestNetMsg>().unwrap();

let test = TestNetMsg {
message: String::from("Boop!"),
Expand All @@ -642,7 +654,7 @@ use crate::SubsetBuilder;
info!("Attempting to send a message to server {}", server_key_pair.public.to_base64());

{
let out = tokio::time::timeout(Duration::from_secs(5), test_receiver.recv_wait())
let out = tokio::time::timeout(Duration::from_secs(5), server_test_receiver.recv_wait())
.await
.unwrap()
.unwrap();
Expand All @@ -662,7 +674,7 @@ use crate::SubsetBuilder;
server_to_client_sender.send(test_reply.construct_packet().unwrap()).unwrap();

{
let out = tokio::time::timeout(Duration::from_secs(5), test_receiver.recv_wait())
let out = tokio::time::timeout(Duration::from_secs(5), client_test_receiver.recv_wait())
.await
.unwrap()
.unwrap();
Expand All @@ -674,7 +686,9 @@ use crate::SubsetBuilder;
assert_eq!(out.message, test_reply.message);
}

quit_game(Duration::from_millis(500)).await.unwrap();
info!("All behavior between the end of init&handshake, and the beginning of shutdown, took {:?}", post_handshake.elapsed());

quit_game(Duration::from_millis(50)).await.unwrap();

let _ = join_handle_s.abort();
let _ = join_handle_c.abort();
Expand Down
9 changes: 7 additions & 2 deletions gestalt-core/src/net/net_channels.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use gestalt_proc_macros::ChannelSet;
use log::warn;

use crate::{
common::identity::NodeIdentity, message::{MessageSender, MpscSender, SendError}, BroadcastChannel, BroadcastReceiver, BroadcastSender, ChannelCapacityConf, ChannelInit, DomainMessageSender, DomainMultiChannel, DomainSenderSubscribe, DomainSubscribeErr, DomainTakeReceiver, MessageReceiver, MessageReceiverAsync, MpscChannel, MpscReceiver, MultiDomainSender, NewDomainErr, ReceiverChannel, SenderChannel, StaticChannelAtom
Expand Down Expand Up @@ -325,8 +326,12 @@ pub struct NetSystemChannels {
impl NetSystemChannels {
pub fn init_peer(&self, session: FullSessionName, ident: NodeIdentity) {
self.net_msg_outbound.init_peer(ident);
self.raw_to_session.init_domain(session.clone());
self.system_kill_session.init_domain(session);
if let Err(_) = self.raw_to_session.init_domain(session.clone()) {
warn!("Session {session:?} was already initialized, but an attempt was made to initialize it again. This should proceed without problems but may be a sign of bugs elsewhere.")
}
if let Err(_) = self.system_kill_session.init_domain(session) {
warn!("Session {session:?} was already initialized, but an attempt was made to initialize it again. This should proceed without problems but may be a sign of bugs elsewhere.")
}
}
pub fn drop_peer(&self, session: &FullSessionName, ident: &NodeIdentity) {
self.net_msg_outbound.drop_peer(ident);
Expand Down
16 changes: 12 additions & 4 deletions gestalt-core/src/net/preprotocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use log::{error, info, trace};
use parking_lot::Mutex;
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -193,6 +194,7 @@ pub struct PreProtocolReceiver {
peer_role: Option<NetworkRole>,
mismatch_reporter: Option<NewProtocolKeyReporter>,
mismatch_approver: Option<NewProtocolKeyApprover>,
start_time: Instant,
}

impl PreProtocolReceiver {
Expand All @@ -210,8 +212,12 @@ impl PreProtocolReceiver {
peer_role: None,
mismatch_reporter: Some(mismatch_reporter),
mismatch_approver: Some(mismatch_approver),
start_time: Instant::now(),
}
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
pub fn is_handshake_done(&self) -> bool {
match &self.state {
PreProtocolReceiverState::QueryAnswerer => false,
Expand Down Expand Up @@ -411,7 +417,7 @@ pub async fn preprotocol_receiver_session(
transport_counter: seq as u32,
};

info!("A connection to this server was successfully made by client {}", completed.peer_identity.to_base64());
info!("A connection to this server was successfully made by client {}, which took {:?}", completed.peer_identity.to_base64(), receiver.elapsed());
internal_connect.send(completed).unwrap();
// Done with this part, stop sending.
false
Expand Down Expand Up @@ -642,6 +648,7 @@ pub async fn preprotocol_connect_to_server(
channels: PreprotocolSessionChannels,
) -> Result<(), HandshakeError> {
let PreprotocolSessionChannels { internal_connect, key_mismatch_reporter, key_mismatch_approver } = channels;
let start_time = tokio::time::Instant::now();
match tokio::time::timeout(connect_timeout, TcpStream::connect(&server_address)).await {
Ok(Ok(mut stream)) => {
// TODO figure out how connections where the initiator will be a non-client at some point
Expand All @@ -658,8 +665,9 @@ pub async fn preprotocol_connect_to_server(
{
Ok(completed_connection) => {
info!(
"Successfully initiated connection to a server with identity {}",
completed_connection.peer_identity.to_base64()
"Successfully initiated connection to a server with identity {}, which took {:?}",
completed_connection.peer_identity.to_base64(),
start_time.elapsed(),
);
stream.shutdown().await.unwrap();
internal_connect.send(completed_connection).unwrap();
Expand Down Expand Up @@ -752,7 +760,7 @@ pub mod test {
server_channels.clone(),
));
//Give it a moment
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(10)).await;
//Try to connect
preprotocol_connect_to_server(
client_key_pair,
Expand Down

0 comments on commit 639d864

Please sign in to comment.