Skip to content

Commit

Permalink
Much better #[derive(ChannelSet)] implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
NotGyro committed Oct 3, 2024
1 parent 1b253ab commit 907f9dc
Show file tree
Hide file tree
Showing 8 changed files with 1,017 additions and 453 deletions.
253 changes: 190 additions & 63 deletions gestalt-core/src/common/message.rs

Large diffs are not rendered by default.

31 changes: 21 additions & 10 deletions gestalt-core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
#![feature(int_roundings)]
#![feature(inherent_associated_types)]
#![feature(array_try_from_fn)]
#![feature(trivial_bounds)]
#![allow(clippy::large_enum_variant)]

#[macro_use]
pub mod common;
pub mod main_channels;
pub use common::message;
use net::net_channels::NetSystemChannelsFields;
pub use crate::main_channels::*;
use gestalt_proc_macros::ChannelSet;
use semver::Version;
Expand Down Expand Up @@ -58,7 +60,7 @@ use crate::{
},
net::{
default_protocol_store_dir,
net_channels::{net_recv_channel, net_send_channel, NetSessionSender},
net_channels::NetMsgSender,
preprotocol::{launch_preprotocol_listener, preprotocol_connect_to_server},
reliable_udp::LaminarConfig,
NetworkSystem, SelfNetworkRole,
Expand Down Expand Up @@ -225,6 +227,11 @@ pub async fn protocol_key_change_approver(
}
}

pub fn init_channels() -> MainChannelSet {
let mut conf = ChannelCapacityConf::new();
MainChannelSet::new(&conf)
}

#[allow(unused_must_use)]
fn main() {
// Announce the engine launching, for our command-line friends.
Expand Down Expand Up @@ -283,6 +290,10 @@ fn main() {
warn!("Verbose logging CAN, OCCASIONALLY, LEAK PRIVATE INFORMATION. \n It is only recommended for debugging purposes. \n Please do not use it for general play.");
}

info!("Initializing main channel set...");
let channels = init_channels();
info!("Main channel set ready.");

let key_dir = PathBuf::from("keys/");
let keyfile_name = "identity_key";
// Load our identity key pair. Right now this will be the same on both client and server - that will change later.
Expand Down Expand Up @@ -346,8 +357,8 @@ fn main() {
info!("Setting up channels.");

async_runtime.spawn(protocol_key_change_approver(
PROTOCOL_KEY_REPORTER.receiver_subscribe(),
PROTOCOL_KEY_APPROVER.sender_subscribe(),
channels.net_channels.key_mismatch_reporter.receiver_subscribe(),
channels.net_channels.key_mismatch_approver.sender_subscribe(),
));

let mut laminar_config = LaminarConfig::default();
Expand Down Expand Up @@ -378,11 +389,9 @@ fn main() {
async_runtime.spawn(launch_preprotocol_listener(
keys,
None,
connect_sender,
3223,
protocol_store_dir,
PROTOCOL_KEY_REPORTER.clone(),
PROTOCOL_KEY_APPROVER.clone(),
channels.net_channels.build_subset(SubsetBuilder::new(())).unwrap(),
));

info!("Spawning network system task.");
Expand All @@ -391,10 +400,12 @@ fn main() {
let mut sys = NetworkSystem::new(
SelfNetworkRole::Server,
udp_address,
connect_receiver,
keys_for_net,
laminar_config,
Duration::from_millis(25),
channels.net_channels.build_subset(NetSystemChannelsFields {
recv_internal_connections: channels.net_channels.internal_connect.take_receiver().unwrap(),
}.into()).unwrap(),
)
.await
.unwrap();
Expand Down Expand Up @@ -506,7 +517,7 @@ fn main() {

std::thread::sleep(Duration::from_millis(50));

let voxel_event_sender: NetSessionSender<VoxelChangeRequest> =
let voxel_event_sender: NetMsgSender<VoxelChangeRequest> =
net_send_channel::subscribe_sender(&server_identity).unwrap();

let mut client_join_receiver_from_server =
Expand Down Expand Up @@ -557,8 +568,8 @@ fn main() {
);*/
} else {
let (voxel_event_sender, mut voxel_event_receiver) = tokio::sync::broadcast::channel(4096);
let voxel_event_sender: NetSessionSender<VoxelChangeRequest> =
NetSessionSender::new(voxel_event_sender);
let voxel_event_sender: NetMsgSender<VoxelChangeRequest> =
NetMsgSender::new(voxel_event_sender);

let client_voxel_receiver_from_server =
net_recv_channel::subscribe::<VoxelChangeAnnounce>().unwrap();
Expand Down
18 changes: 18 additions & 0 deletions gestalt-core/src/main_channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use gestalt_proc_macros::ChannelSet;

use crate::net::net_channels::EngineNetChannels;

use crate::ChannelCapacityConf;

#[derive(ChannelSet)]
pub struct MainChannelSet {
pub net_channels: EngineNetChannels,
}

impl MainChannelSet {
pub fn new(conf: &ChannelCapacityConf) -> Self {
Self {
net_channels: EngineNetChannels::new(conf)
}
}
}
57 changes: 38 additions & 19 deletions gestalt-core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use log::error;
use log::info;
use log::trace;
use log::warn;
use net_channels::NetSystemChannels;
use net_channels::SessionChannelsFields;
use std::collections::HashMap;

use snow::StatelessTransportState;
Expand All @@ -21,8 +23,8 @@ use crate::common::identity::IdentityKeyPair;
use crate::common::identity::NodeIdentity;
use crate::message::MessageSender;
use crate::message::QuitReceiver;
use crate::net::net_channels::net_send_channel;
use crate::net::net_channels::CONNECTED;
use crate::BuildSubset;
use crate::SubsetBuilder;

use base64::engine::general_purpose::URL_SAFE as BASE_64;

Expand All @@ -45,7 +47,6 @@ pub use netmsg::PacketIntermediary;
pub use netmsg::SelfNetworkRole;
pub use netmsg::DISCONNECT_RESERVED;

use self::net_channels::INBOUND_NET_MESSAGES;
use self::netmsg::CiphertextEnvelope;
use self::netmsg::OuterEnvelopeError;
use self::reliable_udp::*;
Expand Down Expand Up @@ -86,6 +87,29 @@ impl SuccessfulConnect {
}
}

/// Represents a client who we are ready to interact with
/// (i.e. UDP session is established and ready to go)
#[derive(Debug, Clone)]
pub struct ConnectAnnounce {
pub peer_identity: NodeIdentity,
pub peer_role: NetworkRole,
}

impl From<&SuccessfulConnect> for ConnectAnnounce {
fn from(value: &SuccessfulConnect) -> Self {
ConnectAnnounce {
peer_identity: value.peer_identity.clone(),
peer_role: value.peer_role.clone(),
}
}
}

#[derive(Clone, Debug)]
pub struct DisconnectAnnounce {
pub peer_identity: NodeIdentity,
pub peer_role: NetworkRole,
}

#[derive(thiserror::Error, Debug)]
pub enum NetworkError {
#[error("Error encountered encoding or decoding an outer envelope: {0:?}")]
Expand All @@ -99,16 +123,14 @@ pub enum NetworkError {
pub struct NetworkSystem {
pub our_role: SelfNetworkRole,
socket: UdpSocket,
pub new_connections: mpsc::UnboundedReceiver<SuccessfulConnect>,
pub local_identity: IdentityKeyPair,
pub laminar_config: LaminarConfig,
pub session_tick_interval: Duration,
/// Used by servers to hold on to client info until we can ascertain their new port number (the TCP port number from preprotocol/handshake got dropped)
anticipated_clients: HashMap<PartialSessionName, SuccessfulConnect>,
recv_buf: Vec<u8>,
send_buf: Vec<u8>,
push_sender: PushSender,
push_receiver: PushReceiver,
channels: NetSystemChannels,
/// One receiver for each session. Messages come into this UDP handler from sessions, and we have to send them.
/// Remember, "Multiple producer single receiver." This is the single receiver.
/// Per-session channels for routing incoming UDP packets to sessions.
Expand All @@ -128,12 +150,11 @@ impl NetworkSystem {
pub async fn new(
our_role: SelfNetworkRole,
address: SocketAddr,
new_connections: mpsc::UnboundedReceiver<SuccessfulConnect>,
local_identity: IdentityKeyPair,
laminar_config: LaminarConfig,
session_tick_interval: Duration,
channels: NetSystemChannels,
) -> Result<Self, std::io::Error> {
let (push_sender, push_receiver): (PushSender, PushReceiver) = mpsc::unbounded_channel();
let (kill_from_inside_session_sender, kill_from_inside_session_receiver) =
mpsc::unbounded_channel::<(FullSessionName, Vec<SessionLayerError>)>();

Expand All @@ -147,15 +168,13 @@ impl NetworkSystem {
Ok(Self {
our_role,
socket,
new_connections,
local_identity,
laminar_config,
session_tick_interval,
anticipated_clients: HashMap::default(),
recv_buf: vec![0u8; MAX_MESSAGE_SIZE],
send_buf: vec![0u8; MAX_MESSAGE_SIZE],
push_sender,
push_receiver,
channels,
inbound_channels: HashMap::default(),
kill_from_inside_session_sender,
kill_from_inside_session_receiver,
Expand All @@ -176,8 +195,8 @@ impl NetworkSystem {
);

//Communication with the rest of the engine.
net_channels::register_peer(&connection.peer_identity);
match net_send_channel::subscribe_receiver(&connection.peer_identity) {
let resl_peer = self.channels.net_msg_outbound.register_peer(connection.peer_identity.clone());
match resl_peer {
Ok(receiver) => {
let peer_identity = connection.peer_identity.clone();
trace!("Sender channel successfully registered for {}", peer_identity.to_base64());
Expand All @@ -188,8 +207,10 @@ impl NetworkSystem {
actual_address.peer_address,
connection,
self.laminar_config.clone(),
self.push_sender.clone(),
Instant::now(),
self.channels.build_subset(SubsetBuilder::new(SessionChannelsFields{
net_msg_outbound: receiver,
})).unwrap()
);

// Make a channel
Expand Down Expand Up @@ -503,10 +524,8 @@ mod test {
use crate::message::SenderSubscribe;
use crate::message_types::JoinDefaultEntry;
use crate::net::handshake::approver_no_mismatch;
use crate::net::net_channels::net_recv_channel;
use crate::net::net_channels::net_recv_channel::NetMsgReceiver;

use super::net_channels::NetSessionSender;
use super::net_channels::NetMsgSender;
use super::preprotocol::launch_preprotocol_listener;
use super::preprotocol::preprotocol_connect_to_server;
use super::*;
Expand Down Expand Up @@ -649,7 +668,7 @@ mod test {
let test = TestNetMsg {
message: String::from("Boop!"),
};
let client_to_server_sender: NetSessionSender<TestNetMsg> =
let client_to_server_sender: NetMsgSender<TestNetMsg> =
net_send_channel::subscribe_sender(&server_key_pair.public).unwrap();
client_to_server_sender.send(test.clone()).unwrap();
info!("Attempting to send a message to server {}", server_key_pair.public.to_base64());
Expand All @@ -670,7 +689,7 @@ mod test {
let test_reply = TestNetMsg {
message: String::from("Beep!"),
};
let server_to_client_sender: NetSessionSender<TestNetMsg> =
let server_to_client_sender: NetMsgSender<TestNetMsg> =
net_send_channel::subscribe_sender(&client_key_pair.public).unwrap();
info!("Attempting to send a message to client {}", client_key_pair.public.to_base64());
server_to_client_sender.send(test_reply.clone()).unwrap();
Expand Down
Loading

0 comments on commit 907f9dc

Please sign in to comment.