-
-
Notifications
You must be signed in to change notification settings - Fork 402
/
pacing.rs
302 lines (255 loc) · 9.76 KB
/
pacing.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
//! Pacing of packet transmissions.
use crate::{Duration, Instant};
use tracing::warn;
/// A simple token-bucket pacer
///
/// The pacer's capacity is derived on a fraction of the congestion window
/// which can be sent in regular intervals
/// Once the bucket is empty, further transmission is blocked.
/// The bucket refills at a rate slightly faster
/// than one congestion window per RTT, as recommended in
/// <https://tools.ietf.org/html/draft-ietf-quic-recovery-34#section-7.7>
pub(super) struct Pacer {
capacity: u64,
last_window: u64,
last_mtu: u16,
tokens: u64,
prev: Instant,
}
impl Pacer {
/// Obtains a new [`Pacer`].
pub(super) fn new(smoothed_rtt: Duration, window: u64, mtu: u16, now: Instant) -> Self {
let capacity = optimal_capacity(smoothed_rtt, window, mtu);
Self {
capacity,
last_window: window,
last_mtu: mtu,
tokens: capacity,
prev: now,
}
}
/// Record that a packet has been transmitted.
pub(super) fn on_transmit(&mut self, packet_length: u16) {
self.tokens = self.tokens.saturating_sub(packet_length.into())
}
/// Return how long we need to wait before sending `bytes_to_send`
///
/// If we can send a packet right away, this returns `None`. Otherwise, returns `Some(d)`,
/// where `d` is the time before this function should be called again.
///
/// The 5/4 ratio used here comes from the suggestion that N = 1.25 in the draft IETF RFC for
/// QUIC.
pub(super) fn delay(
&mut self,
smoothed_rtt: Duration,
bytes_to_send: u64,
mtu: u16,
window: u64,
now: Instant,
) -> Option<Instant> {
debug_assert_ne!(
window, 0,
"zero-sized congestion control window is nonsense"
);
if window != self.last_window || mtu != self.last_mtu {
self.capacity = optimal_capacity(smoothed_rtt, window, mtu);
// Clamp the tokens
self.tokens = self.capacity.min(self.tokens);
self.last_window = window;
self.last_mtu = mtu;
}
// if we can already send a packet, there is no need for delay
if self.tokens >= bytes_to_send {
return None;
}
// we disable pacing for extremely large windows
if window > u32::MAX.into() {
return None;
}
let window = window as u32;
let time_elapsed = now.checked_duration_since(self.prev).unwrap_or_else(|| {
warn!("received a timestamp early than a previous recorded time, ignoring");
Default::default()
});
if smoothed_rtt.as_nanos() == 0 {
return None;
}
let elapsed_rtts = time_elapsed.as_secs_f64() / smoothed_rtt.as_secs_f64();
let new_tokens = window as f64 * 1.25 * elapsed_rtts;
self.tokens = self
.tokens
.saturating_add(new_tokens as _)
.min(self.capacity);
self.prev = now;
// if we can already send a packet, there is no need for delay
if self.tokens >= bytes_to_send {
return None;
}
let unscaled_delay = smoothed_rtt
.checked_mul((bytes_to_send.max(self.capacity) - self.tokens) as _)
.unwrap_or(Duration::MAX)
/ window;
// divisions come before multiplications to prevent overflow
// this is the time at which the pacing window becomes empty
Some(self.prev + (unscaled_delay / 5) * 4)
}
}
/// Calculates a pacer capacity for a certain window and RTT
///
/// The goal is to emit a burst (of size `capacity`) in timer intervals
/// which compromise between
/// - ideally distributing datagrams over time
/// - constantly waking up the connection to produce additional datagrams
///
/// Too short burst intervals means we will never meet them since the timer
/// accuracy in user-space is not high enough. If we miss the interval by more
/// than 25%, we will lose that part of the congestion window since no additional
/// tokens for the extra-elapsed time can be stored.
///
/// Too long burst intervals make pacing less effective.
fn optimal_capacity(smoothed_rtt: Duration, window: u64, mtu: u16) -> u64 {
let rtt = smoothed_rtt.as_nanos().max(1);
let capacity = ((window as u128 * BURST_INTERVAL_NANOS) / rtt) as u64;
// Small bursts are less efficient (no GSO), could increase latency and don't effectively
// use the channel's buffer capacity. Large bursts might block the connection on sending.
capacity.clamp(MIN_BURST_SIZE * mtu as u64, MAX_BURST_SIZE * mtu as u64)
}
/// The burst interval
///
/// The capacity will we refilled in 4/5 of that time.
/// 2ms is chosen here since framework timers might have 1ms precision.
/// If kernel-level pacing is supported later a higher time here might be
/// more applicable.
const BURST_INTERVAL_NANOS: u128 = 2_000_000; // 2ms
/// Allows some usage of GSO, and doesn't slow down the handshake.
const MIN_BURST_SIZE: u64 = 10;
/// Creating 256 packets took 1ms in a benchmark, so larger bursts don't make sense.
const MAX_BURST_SIZE: u64 = 256;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn does_not_panic_on_bad_instant() {
let old_instant = Instant::now();
let new_instant = old_instant + Duration::from_micros(15);
let rtt = Duration::from_micros(400);
assert!(Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 0, 1500, 1, old_instant)
.is_none());
assert!(Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 1600, 1500, 1, old_instant)
.is_none());
assert!(Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 1500, 1500, 3000, old_instant)
.is_none());
}
#[test]
fn derives_initial_capacity() {
let window = 2_000_000;
let mtu = 1500;
let rtt = Duration::from_millis(50);
let now = Instant::now();
let pacer = Pacer::new(rtt, window, mtu, now);
assert_eq!(
pacer.capacity,
(window as u128 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, pacer.capacity);
let pacer = Pacer::new(Duration::from_millis(0), window, mtu, now);
assert_eq!(pacer.capacity, MAX_BURST_SIZE * mtu as u64);
assert_eq!(pacer.tokens, pacer.capacity);
let pacer = Pacer::new(rtt, 1, mtu, now);
assert_eq!(pacer.capacity, MIN_BURST_SIZE * mtu as u64);
assert_eq!(pacer.tokens, pacer.capacity);
}
#[test]
fn adjusts_capacity() {
let window = 2_000_000;
let mtu = 1500;
let rtt = Duration::from_millis(50);
let now = Instant::now();
let mut pacer = Pacer::new(rtt, window, mtu, now);
assert_eq!(
pacer.capacity,
(window as u128 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, pacer.capacity);
let initial_tokens = pacer.tokens;
pacer.delay(rtt, mtu as u64, mtu, window * 2, now);
assert_eq!(
pacer.capacity,
(2 * window as u128 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, initial_tokens);
pacer.delay(rtt, mtu as u64, mtu, window / 2, now);
assert_eq!(
pacer.capacity,
(window as u128 / 2 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, initial_tokens / 2);
pacer.delay(rtt, mtu as u64, mtu * 2, window, now);
assert_eq!(
pacer.capacity,
(window as u128 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64
);
pacer.delay(rtt, mtu as u64, 20_000, window, now);
assert_eq!(pacer.capacity, 20_000_u64 * MIN_BURST_SIZE);
}
#[test]
fn computes_pause_correctly() {
let window = 2_000_000u64;
let mtu = 1000;
let rtt = Duration::from_millis(50);
let old_instant = Instant::now();
let mut pacer = Pacer::new(rtt, window, mtu, old_instant);
let packet_capacity = pacer.capacity / mtu as u64;
for _ in 0..packet_capacity {
assert_eq!(
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
None,
"When capacity is available packets should be sent immediately"
);
pacer.on_transmit(mtu);
}
let pace_duration = Duration::from_nanos((BURST_INTERVAL_NANOS * 4 / 5) as u64);
assert_eq!(
pacer
.delay(rtt, mtu as u64, mtu, window, old_instant)
.expect("Send must be delayed")
.duration_since(old_instant),
pace_duration
);
// Refill half of the tokens
assert_eq!(
pacer.delay(
rtt,
mtu as u64,
mtu,
window,
old_instant + pace_duration / 2
),
None
);
assert_eq!(pacer.tokens, pacer.capacity / 2);
for _ in 0..packet_capacity / 2 {
assert_eq!(
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
None,
"When capacity is available packets should be sent immediately"
);
pacer.on_transmit(mtu);
}
// Refill all capacity by waiting more than the expected duration
assert_eq!(
pacer.delay(
rtt,
mtu as u64,
mtu,
window,
old_instant + pace_duration * 3 / 2
),
None
);
assert_eq!(pacer.tokens, pacer.capacity);
}
}