Skip to content

Commit

Permalink
Implement TCP mux connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jkcoxson committed Jan 7, 2025
1 parent 3a23670 commit 15fd214
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl SharedDevices {
self.devices.insert(udid, dev);
}

pub fn get_device_by_id(&self, id: u64) -> Option<&MuxerDevice> {
self.devices.values().find(|x| x.device_id == id)
}

#[cfg(feature = "usb")]
pub fn add_usb_device(&mut self, udid: String, _data: Arc<Mutex<Self>>) {
self.last_index += 1;
Expand Down
86 changes: 83 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ async fn main() {

enum Directions {
None,
Connect,
Listen,
}

Expand Down Expand Up @@ -242,6 +241,7 @@ async fn handle_stream(
return;
}
};
trace!("Recv'd plist: {parsed:?}");

match current_directions {
Directions::None => {
Expand Down Expand Up @@ -381,14 +381,94 @@ async fn handle_stream(
return;
}
"Connect" => {
current_directions = Directions::Connect;
let connection_port = parsed
.plist
.dict_get_item("PortNumber")
.unwrap()
.get_uint_val()
.unwrap();
let device_id = parsed
.plist
.dict_get_item("DeviceID")
.unwrap()
.get_uint_val()
.unwrap();

let connection_port = connection_port as u16;
let connection_port = connection_port.to_be();

info!("Client is establishing connection to port {connection_port}");
let mut central_data = data.lock().await;
if let Some(device) = central_data.get_device_by_id(device_id) {
let network_address = device.network_address.clone();
let device_id = device.device_id.clone();
std::mem::drop(central_data);

info!("Connecting to device {}", device_id);

match network_address {
Some(ip) => {
match tokio::net::TcpStream::connect((ip, connection_port))
.await
{
Ok(mut stream) => {
let mut p = Plist::new_dict();
p.dict_set_item("MessageType", "Result".into())
.unwrap();
p.dict_set_item("Number", Plist::new_uint(0))
.unwrap();

let res = RawPacket::new(p, 1, 8, parsed.tag);
let res: Vec<u8> = res.into();
socket.write_all(&res).await.unwrap();

if let Err(e) = tokio::io::copy_bidirectional(
&mut stream,
&mut socket,
)
.await
{
info!("Bidirectional stream stopped: {e:?}");
}
return;
}
Err(e) => {
error!("Unable to connect to device {device_id} port {connection_port}: {e:?}");
let mut p = Plist::new_dict();
p.dict_set_item("MessageType", "Result".into())
.unwrap();
p.dict_set_item("Number", Plist::new_uint(1))
.unwrap();

let res = RawPacket::new(p, 1, 8, parsed.tag);
let res: Vec<u8> = res.into();
socket.write_all(&res).await.unwrap();

return;
}
}
}
None => {
unimplemented!()
}
}
} else {
let mut p = Plist::new_dict();
p.dict_set_item("MessageType", "Result".into()).unwrap();
p.dict_set_item("Number", Plist::new_uint(1)).unwrap();

let res = RawPacket::new(p, 1, 8, parsed.tag);
let res: Vec<u8> = res.into();
socket.write_all(&res).await.unwrap();

return;
}
}
_ => {
warn!("Unknown packet type");
}
}
}
Directions::Connect => todo!(),
Directions::Listen => {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand Down

0 comments on commit 15fd214

Please sign in to comment.