Skip to content

Commit

Permalink
Allow to enable/disable batching from config (#1196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets authored Jun 26, 2024
1 parent 73bdb7f commit ebc684c
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 7 deletions.
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@
/// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
/// The default batch size value is the maximum batch size: 65535.
batch_size: 65535,
/// Perform batching of messages if they are smaller of the batch_size
batching: true,
/// Each zenoh link has a transmission queue that can be configured
queue: {
/// The size of each priority queue indicates the number of batches a given queue can contain.
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl Default for LinkTxConf {
batch_size: BatchSize::MAX,
queue: QueueConf::default(),
threads: num,
batching: true,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,10 @@ validated_struct::validator! {
lease: u64,
/// Number of keep-alive messages in a link lease duration (default: 4)
keep_alive: usize,
/// Zenoh's MTU equivalent (default: 2^16-1)
/// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1)
batch_size: BatchSize,
/// Perform batching of messages if they are smaller of the batch_size
batching: bool,
pub queue: QueueConf {
/// The size of each priority queue indicates the number of batches a given queue can contain.
/// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
Expand Down
23 changes: 17 additions & 6 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct StageIn {
s_out: StageInOut,
mutex: StageInMutex,
fragbuf: ZBuf,
batching: bool,
}

impl StageIn {
Expand Down Expand Up @@ -179,7 +180,7 @@ impl StageIn {

macro_rules! zretok {
($batch:expr, $msg:expr) => {{
if $msg.is_express() {
if !self.batching || $msg.is_express() {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
Expand Down Expand Up @@ -315,11 +316,17 @@ impl StageIn {

macro_rules! zretok {
($batch:expr) => {{
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
if !self.batching {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
} else {
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
}
}};
}

Expand Down Expand Up @@ -494,6 +501,7 @@ pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) batching: bool,
pub(crate) backoff: Duration,
}

Expand Down Expand Up @@ -554,6 +562,7 @@ impl TransmissionPipeline {
priority: priority[prio].clone(),
},
fragbuf: ZBuf::empty(),
batching: config.batching,
}));

// The stage out for this priority
Expand Down Expand Up @@ -765,6 +774,7 @@ mod tests {
is_compression: true,
},
queue_size: [1; Priority::NUM],
batching: true,
wait_before_drop: Duration::from_millis(1),
backoff: Duration::from_micros(1),
};
Expand All @@ -777,6 +787,7 @@ mod tests {
is_compression: false,
},
queue_size: [1; Priority::NUM],
batching: true,
wait_before_drop: Duration::from_millis(1),
backoff: Duration::from_micros(1),
};
Expand Down
10 changes: 10 additions & 0 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct TransportManagerConfig {
pub whatami: WhatAmI,
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
Expand Down Expand Up @@ -129,6 +130,7 @@ pub struct TransportManagerBuilder {
whatami: WhatAmI,
resolution: Resolution,
batch_size: BatchSize,
batching: bool,
wait_before_drop: Duration,
queue_size: QueueSizeConf,
queue_backoff: Duration,
Expand Down Expand Up @@ -170,6 +172,11 @@ impl TransportManagerBuilder {
self
}

pub fn batching(mut self, batching: bool) -> Self {
self.batching = batching;
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
self.wait_before_drop = wait_before_drop;
self
Expand Down Expand Up @@ -231,6 +238,7 @@ impl TransportManagerBuilder {
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
self = self.batch_size(*link.tx().batch_size());
self = self.batching(*link.tx().batching());
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(Duration::from_micros(
Expand Down Expand Up @@ -293,6 +301,7 @@ impl TransportManagerBuilder {
whatami: self.whatami,
resolution: self.resolution,
batch_size: self.batch_size,
batching: self.batching,
wait_before_drop: self.wait_before_drop,
queue_size,
queue_backoff: self.queue_backoff,
Expand Down Expand Up @@ -339,6 +348,7 @@ impl Default for TransportManagerBuilder {
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching: true,
wait_before_drop: Duration::from_micros(wait_before_drop),
queue_size: queue.size,
queue_backoff: Duration::from_nanos(backoff),
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ impl TransportLinkMulticastUniversal {
batch: self.link.config.batch,
queue_size: self.transport.manager.config.queue_size,
wait_before_drop: self.transport.manager.config.wait_before_drop,
batching: self.transport.manager.config.batching,
backoff: self.transport.manager.config.queue_backoff,
};
// The pipeline
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl TransportLinkUnicastUniversal {
},
queue_size: transport.manager.config.queue_size,
wait_before_drop: transport.manager.config.wait_before_drop,
batching: transport.manager.config.batching,
backoff: transport.manager.config.queue_backoff,
};

Expand Down

0 comments on commit ebc684c

Please sign in to comment.