Skip to content

Commit

Permalink
Improve pipeline backoff (#1097)
Browse files Browse the repository at this point in the history
* Yield task for backoff

* Improve comments and error handling in backoff

* Simplify pipeline pull

* Consider backoff configuration
  • Loading branch information
Mallets authored Jun 10, 2024
1 parent c279982 commit 0942a69
Showing 1 changed file with 46 additions and 37 deletions.
83 changes: 46 additions & 37 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use zenoh_protocol::{
type NanoSeconds = u32;

const RBLEN: usize = QueueSizeConf::MAX;
const TSLOT: NanoSeconds = 100;

// Inner structure to reuse serialization batches
struct StageInRefill {
Expand Down Expand Up @@ -347,15 +346,17 @@ enum Pull {
// Inner structure to keep track and signal backoff operations
#[derive(Clone)]
struct Backoff {
tslot: NanoSeconds,
retry_time: NanoSeconds,
last_bytes: BatchSize,
bytes: Arc<AtomicU16>,
backoff: Arc<AtomicBool>,
}

impl Backoff {
fn new(bytes: Arc<AtomicU16>, backoff: Arc<AtomicBool>) -> Self {
fn new(tslot: NanoSeconds, bytes: Arc<AtomicU16>, backoff: Arc<AtomicBool>) -> Self {
Self {
tslot,
retry_time: 0,
last_bytes: 0,
bytes,
Expand All @@ -365,7 +366,7 @@ impl Backoff {

fn next(&mut self) {
if self.retry_time == 0 {
self.retry_time = TSLOT;
self.retry_time = self.tslot;
self.backoff.store(true, Ordering::Relaxed);
} else {
match self.retry_time.checked_mul(2) {
Expand All @@ -383,7 +384,7 @@ impl Backoff {
}
}

fn stop(&mut self) {
fn reset(&mut self) {
self.retry_time = 0;
self.backoff.store(false, Ordering::Relaxed);
}
Expand All @@ -400,7 +401,6 @@ impl StageOutIn {
#[inline]
fn try_pull(&mut self) -> Pull {
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
return Pull::Some(batch);
}

Expand All @@ -412,41 +412,26 @@ impl StageOutIn {
let old_bytes = self.backoff.last_bytes;
self.backoff.last_bytes = new_bytes;

match new_bytes.cmp(&old_bytes) {
std::cmp::Ordering::Equal => {
// No new bytes have been written on the batch, try to pull
if let Ok(mut g) = self.current.try_lock() {
// First try to pull from stage OUT
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
if new_bytes == old_bytes {
// It seems no new bytes have been written on the batch, try to pull
if let Ok(mut g) = self.current.try_lock() {
// First try to pull from stage OUT to make sure we are not in the case
// where new_bytes == old_bytes are because of two identical serializations
if let Some(batch) = self.s_out_r.pull() {
return Pull::Some(batch);
}

// An incomplete (non-empty) batch may be available in the state IN pipeline.
match g.take() {
Some(batch) => {
return Pull::Some(batch);
}

// An incomplete (non-empty) batch is available in the state IN pipeline.
match g.take() {
Some(batch) => {
self.backoff.stop();
return Pull::Some(batch);
}
None => {
self.backoff.stop();
return Pull::None;
}
None => {
return Pull::None;
}
}
// Go to backoff
}
std::cmp::Ordering::Less => {
// There should be a new batch in Stage OUT
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
return Pull::Some(batch);
}
// Go to backoff
}
std::cmp::Ordering::Greater => {
// Go to backoff
}
// Go to backoff
}

// Do backoff
Expand Down Expand Up @@ -569,7 +554,7 @@ impl TransmissionPipeline {
s_in: StageOutIn {
s_out_r,
current,
backoff: Backoff::new(bytes, backoff),
backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff),
},
s_ref: StageOutRefill { n_ref_w, s_ref_w },
});
Expand Down Expand Up @@ -657,6 +642,11 @@ pub(crate) struct TransmissionPipelineConsumer {

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
// Reset backoff before pulling
for queue in self.stage_out.iter_mut() {
queue.s_in.backoff.reset();
}

while self.active.load(Ordering::Relaxed) {
// Calculate the backoff maximum
let mut bo = NanoSeconds::MAX;
Expand All @@ -674,10 +664,29 @@ impl TransmissionPipelineConsumer {
}
}

// In case of writing many small messages, `recv_async()` will most likely return immedietaly.
// While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to
// a spinning behaviour while attempting to take the lock. Yield the current task to avoid
// spinning the current task indefinitely.
tokio::task::yield_now().await;

// Wait for the backoff to expire or for a new message
let _ =
let res =
tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async())
.await;
match res {
Ok(Ok(())) => {
// We have received a notification from the channel that some bytes are available, retry to pull.
}
Ok(Err(_channel_error)) => {
// The channel is closed, we can't be notified anymore. Break the loop and return None.
break;
}
Err(_timeout) => {
// The backoff timeout expired. Be aware that tokio timeout may not sleep for short duration since
// it has time resolution of 1ms: https://docs.rs/tokio/latest/tokio/time/fn.sleep.html
}
}
}
None
}
Expand Down

0 comments on commit 0942a69

Please sign in to comment.