Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pipeline backoff #1097

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use zenoh_protocol::{
type NanoSeconds = u32;

const RBLEN: usize = QueueSizeConf::MAX;
const TSLOT: NanoSeconds = 100;

// Inner structure to reuse serialization batches
struct StageInRefill {
Expand Down Expand Up @@ -347,15 +346,17 @@ enum Pull {
// Inner structure to keep track and signal backoff operations
#[derive(Clone)]
struct Backoff {
tslot: NanoSeconds,
retry_time: NanoSeconds,
last_bytes: BatchSize,
bytes: Arc<AtomicU16>,
backoff: Arc<AtomicBool>,
}

impl Backoff {
fn new(bytes: Arc<AtomicU16>, backoff: Arc<AtomicBool>) -> Self {
fn new(tslot: NanoSeconds, bytes: Arc<AtomicU16>, backoff: Arc<AtomicBool>) -> Self {
Self {
tslot,
retry_time: 0,
last_bytes: 0,
bytes,
Expand All @@ -365,7 +366,7 @@ impl Backoff {

fn next(&mut self) {
if self.retry_time == 0 {
self.retry_time = TSLOT;
self.retry_time = self.tslot;
self.backoff.store(true, Ordering::Relaxed);
} else {
match self.retry_time.checked_mul(2) {
Expand All @@ -383,7 +384,7 @@ impl Backoff {
}
}

fn stop(&mut self) {
fn reset(&mut self) {
self.retry_time = 0;
self.backoff.store(false, Ordering::Relaxed);
}
Expand All @@ -400,7 +401,6 @@ impl StageOutIn {
#[inline]
fn try_pull(&mut self) -> Pull {
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
return Pull::Some(batch);
}

Expand All @@ -412,41 +412,26 @@ impl StageOutIn {
let old_bytes = self.backoff.last_bytes;
self.backoff.last_bytes = new_bytes;

match new_bytes.cmp(&old_bytes) {
std::cmp::Ordering::Equal => {
// No new bytes have been written on the batch, try to pull
if let Ok(mut g) = self.current.try_lock() {
// First try to pull from stage OUT
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
if new_bytes == old_bytes {
// It seems no new bytes have been written on the batch, try to pull
if let Ok(mut g) = self.current.try_lock() {
// First try to pull from stage OUT to make sure we are not in the case
// where new_bytes == old_bytes are because of two identical serializations
if let Some(batch) = self.s_out_r.pull() {
return Pull::Some(batch);
}

// An incomplete (non-empty) batch may be available in the state IN pipeline.
match g.take() {
Some(batch) => {
return Pull::Some(batch);
}

// An incomplete (non-empty) batch is available in the state IN pipeline.
match g.take() {
Some(batch) => {
self.backoff.stop();
return Pull::Some(batch);
}
None => {
self.backoff.stop();
return Pull::None;
}
None => {
return Pull::None;
}
}
// Go to backoff
}
std::cmp::Ordering::Less => {
// There should be a new batch in Stage OUT
if let Some(batch) = self.s_out_r.pull() {
self.backoff.stop();
return Pull::Some(batch);
}
// Go to backoff
}
std::cmp::Ordering::Greater => {
// Go to backoff
}
// Go to backoff
}

// Do backoff
Expand Down Expand Up @@ -569,7 +554,7 @@ impl TransmissionPipeline {
s_in: StageOutIn {
s_out_r,
current,
backoff: Backoff::new(bytes, backoff),
backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff),
},
s_ref: StageOutRefill { n_ref_w, s_ref_w },
});
Expand Down Expand Up @@ -657,6 +642,11 @@ pub(crate) struct TransmissionPipelineConsumer {

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
// Reset backoff before pulling
for queue in self.stage_out.iter_mut() {
queue.s_in.backoff.reset();
}

while self.active.load(Ordering::Relaxed) {
// Calculate the backoff maximum
let mut bo = NanoSeconds::MAX;
Expand All @@ -674,10 +664,29 @@ impl TransmissionPipelineConsumer {
}
}

// In case of writing many small messages, `recv_async()` will most likely return immedietaly.
// While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to
// a spinning behaviour while attempting to take the lock. Yield the current task to avoid
// spinning the current task indefinitely.
tokio::task::yield_now().await;

// Wait for the backoff to expire or for a new message
let _ =
let res =
tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async())
.await;
match res {
Ok(Ok(())) => {
// We have received a notification from the channel that some bytes are available, retry to pull.
}
Ok(Err(_channel_error)) => {
// The channel is closed, we can't be notified anymore. Break the loop and return None.
break;
}
Err(_timeout) => {
// The backoff timeout expired. Be aware that tokio timeout may not sleep for short duration since
// it has time resolution of 1ms: https://docs.rs/tokio/latest/tokio/time/fn.sleep.html
}
}
}
None
}
Expand Down