Skip to content

Commit

Permalink
Pacing: Pacing of egress QUIC packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohith Bellad committed Dec 5, 2020
1 parent 26871b7 commit d53b9e5
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 0 deletions.
5 changes: 5 additions & 0 deletions include/quiche.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len);
// Writes a single QUIC packet to be sent to the peer.
ssize_t quiche_conn_send(quiche_conn *conn, uint8_t *out, size_t out_len);

// Writes a single QUIC packet to be sent to the peer and fills in the send_time
// of the packet.
ssize_t quiche_conn_send_at(quiche_conn *conn, uint8_t *out, size_t out_len,
struct timespec *send_time);

// Buffer holding data at a specific offset.
typedef struct RangeBuf quiche_rangebuf;

Expand Down
65 changes: 65 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::sync::atomic;

#[cfg(unix)]
use std::os::unix::io::FromRawFd;
use libc::timespec;

use libc::c_char;
use libc::c_int;
Expand Down Expand Up @@ -241,6 +242,11 @@ pub extern fn quiche_config_enable_hystart(config: &mut Config, v: bool) {
config.enable_hystart(v);
}

#[no_mangle]
pub extern fn quiche_config_enable_pacing(config: &mut Config, v: bool) {
config.enable_pacing(v);
}

#[no_mangle]
pub extern fn quiche_config_enable_dgram(
config: &mut Config, enabled: bool, recv_queue_len: size_t,
Expand Down Expand Up @@ -543,6 +549,32 @@ pub extern fn quiche_conn_send(
}
}

#[no_mangle]
#[cfg(unix)]
pub extern fn quiche_conn_send_at(
conn: &mut Connection, out: *mut u8, out_len: size_t,
send_time: &mut timespec,
) -> ssize_t {
if out_len > <ssize_t>::max_value() as usize {
panic!("The provided buffer is too large");
}

let out = unsafe { slice::from_raw_parts_mut(out, out_len) };
let mut schedule_time = time::Instant::now();

let written = match conn.send_at(out, &mut schedule_time) {
Ok(v) => v as ssize_t,

Err(e) => e.to_c(),
};

if written > 0 {
*send_time = instant_to_timespec(schedule_time);
}

written
}

#[no_mangle]
pub extern fn quiche_conn_stream_recv(
conn: &mut Connection, stream_id: u64, out: *mut u8, out_len: size_t,
Expand Down Expand Up @@ -841,3 +873,36 @@ pub extern fn quiche_conn_dgram_purge_outgoing(
pub extern fn quiche_conn_free(conn: *mut Connection) {
unsafe { Box::from_raw(conn) };
}

#[cfg(unix)]
fn instant_to_timespec(instant: std::time::Instant) -> timespec {
use std::hash::{
Hash,
Hasher,
};

struct ToTimeSpec(Vec<i64>);
impl Hasher for ToTimeSpec {
fn finish(&self) -> u64 {
unimplemented!();
}

fn write(&mut self, _: &[u8]) {
unimplemented!();
}

fn write_u64(&mut self, i: u64) {
self.0.push(i as _);
}

fn write_u32(&mut self, i: u32) {
self.0.push(i as _);
}
}
let mut hasher = ToTimeSpec(vec![]);
instant.hash(&mut hasher);
libc::timespec {
tv_sec: hasher.0[0] as _,
tv_nsec: hasher.0[1] as _,
}
}
76 changes: 76 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ pub struct Config {

hystart: bool,

pacing: bool,

dgram_recv_max_queue_len: usize,
dgram_send_max_queue_len: usize,

Expand All @@ -476,6 +478,7 @@ impl Config {
grease: true,
cc_algorithm: CongestionControlAlgorithm::CUBIC,
hystart: true,
pacing: true,

dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
Expand Down Expand Up @@ -804,6 +807,13 @@ impl Config {
self.dgram_recv_max_queue_len = recv_queue_len;
self.dgram_send_max_queue_len = send_queue_len;
}

/// Configures whether to enable Packet Pacing.
///
/// The default is `true`.
pub fn enable_pacing(&mut self, v: bool) {
self.pacing = v;
}
}

/// A QUIC connection.
Expand Down Expand Up @@ -2763,6 +2773,72 @@ impl Connection {
Ok(written)
}

/// Writes a single QUIC packet to be sent to the peer and fills in
/// send_time of the packet
///
/// On success the number of bytes written to the output buffer is
/// returned, or [`Done`] if there was nothing to write. And send_time
/// is filled with time to send the packet out.
///
/// The application should call `send()` multiple times until [`Done`] is
/// returned, indicating that there are no more packets to send. The time
/// field send_time should be discarded when error is returned. It is
/// recommended that `send()` be called in the following cases:
///
/// * When the application receives QUIC packets from the peer (that is,
/// any time [`recv()`] is also called).
///
/// * When the connection timer expires (that is, any time [`on_timeout()`]
/// is also called).
///
/// * When the application sends data to the peer (for examples, any time
/// [`stream_send()`] or [`stream_shutdown()`] are called).
///
/// [`Done`]: enum.Error.html#variant.Done
/// [`recv()`]: struct.Connection.html#method.recv
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
/// [`stream_send()`]: struct.Connection.html#method.stream_send
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
///
/// ## Examples:
///
/// ```no_run
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = [0xba; 16];
/// # let mut conn = quiche::accept(&scid, None, &mut config)?;
/// # let mut send_time = std::time::Instant::now();
/// loop {
/// let write = match conn.send_at(&mut out, &mut send_time) {
/// Ok(v) => v,
///
/// Err(quiche::Error::Done) => {
/// // Done writing.
/// break;
/// },
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
///
/// // pass send_time to kernel using SO_TXTIME
/// socket.send(&out[..write]).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn send_at(
&mut self, out: &mut [u8], send_time: &mut time::Instant,
) -> Result<usize> {
let written = self.send(out);
if written.is_ok() {
*send_time = self.recovery.get_packet_send_time().unwrap();
}
written
}

// Returns the maximum size of a packet to be sent.
//
// This is a minimum of the sender's and the receiver's maximum UDP payload
Expand Down
85 changes: 85 additions & 0 deletions src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct Recovery {
// Congestion control.
cc_ops: &'static CongestionControlOps,

cc_algo: CongestionControlAlgorithm,

congestion_window: usize,

bytes_in_flight: usize,
Expand All @@ -116,6 +118,8 @@ pub struct Recovery {

bytes_acked: usize,

bytes_sent: usize,

congestion_recovery_start_time: Option<Instant>,

max_datagram_size: usize,
Expand All @@ -124,6 +128,13 @@ pub struct Recovery {

// HyStart++.
hystart: hystart::Hystart,

// Pacing
pacing_enabled: bool,

pacing_rate: usize,

last_packet_scheduled_time: Option<Instant>,
}

impl Recovery {
Expand Down Expand Up @@ -178,19 +189,29 @@ impl Recovery {

bytes_acked: 0,

bytes_sent: 0,

congestion_recovery_start_time: None,

max_datagram_size: config.max_send_udp_payload_size,

cc_ops: config.cc_algorithm.into(),

cc_algo: config.cc_algorithm,

delivery_rate: delivery_rate::Rate::default(),

cubic_state: cubic::State::default(),

app_limited: false,

hystart: hystart::Hystart::new(config.hystart),

pacing_enabled: config.pacing,

pacing_rate: 0,

last_packet_scheduled_time: None,
}
}

Expand Down Expand Up @@ -233,13 +254,71 @@ impl Recovery {
self.hystart.start_round(pkt_num);
}

// Pacing: Set the pacing rate if BBR is not used
if self.pacing_enabled &&
self.cc_algo == CongestionControlAlgorithm::CUBIC
{
let rate = match self.smoothed_rtt {
Some(srtt) =>
((self.congestion_window as u128 * 1000000) /
srtt.as_micros()) as usize,
None => 0,
};
self.set_pacing_rate(rate);
}

self.schedule_next_packet(epoch, now, sent_bytes);

self.bytes_sent += sent_bytes;
trace!("{} {:?}", trace_id, self);
}

fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}

pub fn set_pacing_rate(&mut self, rate: usize) {
if self.pacing_enabled && rate > 0 {
self.pacing_rate = rate;
}
}

pub fn get_packet_send_time(&self) -> Option<Instant> {
self.last_packet_scheduled_time
}

fn schedule_next_packet(
&mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
) {
// Pacing is not done for following cases,
// 1. Packet epoch is not EPOCH_APPLICATION.
// 2. If packet has only ACK frames.
// 3. Start of the connection.
if !self.pacing_enabled ||
epoch != packet::EPOCH_APPLICATION ||
packet_size == 0 ||
self.bytes_sent <= self.congestion_window ||
self.pacing_rate == 0
{
self.last_packet_scheduled_time = Some(now);
return;
}

let interval = (packet_size * 1000000) / self.pacing_rate;
let next_send_interval = Duration::from_micros(interval as u64);

let next_schedule_time = match self.last_packet_scheduled_time {
Some(last_scheduled_time) => last_scheduled_time + next_send_interval,
None => now,
};

self.last_packet_scheduled_time = if next_schedule_time <= now {
Some(now)
} else {
Some(next_schedule_time)
};
}

pub fn on_ack_received(
&mut self, ranges: &ranges::RangeSet, ack_delay: u64,
epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
Expand Down Expand Up @@ -891,6 +970,12 @@ impl std::fmt::Debug for Recovery {
self.congestion_recovery_start_time
)?;
write!(f, "{:?} ", self.delivery_rate)?;
write!(f, "pacing_rate={:?}", self.pacing_rate)?;
write!(
f,
"last_packet_scheduled_time={:?}",
self.last_packet_scheduled_time
)?;

if self.hystart.enabled() {
write!(f, "hystart={:?} ", self.hystart)?;
Expand Down

0 comments on commit d53b9e5

Please sign in to comment.