Skip to content

Commit

Permalink
Remove unnecessary sharing of UdpState
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Mar 4, 2023
1 parent f8a8234 commit d38ec46
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
14 changes: 5 additions & 9 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tokio::{
};
use tokio_util::time::delay_queue;
use tracing::debug_span;
use udp::UdpState;

use crate::{
mutex::Mutex,
Expand All @@ -45,7 +44,6 @@ impl Connecting {
dirty: mpsc::UnboundedSender<ConnectionHandle>,
handle: ConnectionHandle,
conn: proto::Connection,
udp_state: Arc<UdpState>,
) -> (Connecting, ConnectionRef) {
let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
let (on_connected_send, on_connected_recv) = oneshot::channel();
Expand All @@ -55,7 +53,6 @@ impl Connecting {
dirty,
on_handshake_data_send,
on_connected_send,
udp_state,
);

(
Expand Down Expand Up @@ -698,7 +695,6 @@ impl ConnectionRef {
dirty: mpsc::UnboundedSender<ConnectionHandle>,
on_handshake_data: oneshot::Sender<()>,
on_connected: oneshot::Sender<bool>,
udp_state: Arc<UdpState>,
) -> Self {
let _ = dirty.send(handle);
Self(Arc::new(ConnectionInner {
Expand All @@ -719,7 +715,6 @@ impl ConnectionRef {
stopped: FxHashMap::default(),
error: None,
ref_count: 0,
udp_state,
}),
shared: Shared::default(),
}))
Expand Down Expand Up @@ -799,16 +794,17 @@ pub(crate) struct State {
pub(crate) error: Option<ConnectionError>,
/// Number of live handles that can be used to initiate or handle I/O; excludes the driver
ref_count: usize,
udp_state: Arc<UdpState>,
}

impl State {
pub(crate) fn drive_transmit(&mut self, out: &mut VecDeque<proto::Transmit>) -> bool {
pub(crate) fn drive_transmit(
&mut self,
out: &mut VecDeque<proto::Transmit>,
max_datagrams: usize,
) -> bool {
let now = Instant::now();
let mut transmits = 0;

let max_datagrams = self.udp_state.max_gso_segments();

while let Some(t) = self.inner.poll_transmit(now, max_datagrams) {
transmits += match t.segment_size {
None => 1,
Expand Down
14 changes: 6 additions & 8 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ impl Endpoint {
addr
};
let (ch, conn) = endpoint.inner.connect(config, addr, server_name)?;
let udp_state = endpoint.udp_state.clone();
let dirty = endpoint.dirty_send.clone();
Ok(endpoint.connections.insert(dirty, ch, conn, udp_state))
Ok(endpoint.connections.insert(dirty, ch, conn))
}

/// Switch to a new UDP socket
Expand Down Expand Up @@ -337,7 +336,7 @@ pub(crate) struct EndpointInner {
#[derive(Debug)]
pub(crate) struct State {
socket: Box<dyn AsyncUdpSocket>,
udp_state: Arc<UdpState>,
udp_state: UdpState,
inner: proto::Endpoint,
outgoing: VecDeque<proto::Transmit>,
incoming: VecDeque<Connecting>,
Expand Down Expand Up @@ -397,7 +396,6 @@ impl State {
self.dirty_send.clone(),
handle,
conn,
self.udp_state.clone(),
);
self.incoming.push_back(conn);
}
Expand Down Expand Up @@ -503,6 +501,7 @@ impl State {
dirty_buffer.push(conn_handle);
}

let max_datagrams = self.udp_state.max_gso_segments();
let mut drained = Vec::new();
for conn_handle in dirty_buffer {
let conn = match self.connections.refs.get(&conn_handle) {
Expand All @@ -512,7 +511,7 @@ impl State {
let mut state = conn.state.lock("poll dirty");
state.is_dirty = false;
let _guard = state.span.clone().entered();
let mut keep_conn_going = state.drive_transmit(&mut self.outgoing);
let mut keep_conn_going = state.drive_transmit(&mut self.outgoing, max_datagrams);
if let Some(deadline) = state.inner.poll_timeout() {
let deadline = tokio::time::Instant::from(deadline);
if Some(deadline) != state.timer_deadline {
Expand Down Expand Up @@ -566,9 +565,8 @@ impl ConnectionSet {
dirty: mpsc::UnboundedSender<ConnectionHandle>,
handle: ConnectionHandle,
conn: proto::Connection,
udp_state: Arc<UdpState>,
) -> Connecting {
let (future, conn) = Connecting::new(dirty, handle, conn, udp_state);
let (future, conn) = Connecting::new(dirty, handle, conn);
if let Some((error_code, ref reason)) = self.close {
let mut state = conn.state.lock("close");
state.close(error_code, reason.clone(), &conn.shared);
Expand Down Expand Up @@ -630,7 +628,7 @@ pub(crate) struct EndpointRef(Arc<EndpointInner>);

impl EndpointRef {
pub(crate) fn new(socket: Box<dyn AsyncUdpSocket>, inner: proto::Endpoint, ipv6: bool) -> Self {
let udp_state = Arc::new(UdpState::new());
let udp_state = UdpState::new();
let recv_buf = vec![
0;
inner.config().get_max_udp_payload_size().min(64 * 1024) as usize
Expand Down

0 comments on commit d38ec46

Please sign in to comment.