Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update batching to stable. #850

Merged
merged 8 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -2077,12 +2077,11 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

#ifdef Z_FEATURE_UNSTABLE_API
#if Z_FEATURE_BATCHING == 1
/**
* Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g
* z_put, z_get) will be instead stored until the batch is full, flushed with :c:func:`zp_batch_flush` or batching is
* stopped with :c:func:`zp_batch_stop`.
* z_put, z_get) will be instead stored until either: the batch is full, flushed with :c:func:`zp_batch_flush`, batching
* is stopped with :c:func:`zp_batch_stop`, a message needs to be sent immediately.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
Expand Down Expand Up @@ -2114,7 +2113,6 @@ z_result_t zp_batch_flush(const z_loaned_session_t *zs);
*/
z_result_t zp_batch_stop(const z_loaned_session_t *zs);
#endif
#endif

/************* Multi Thread Tasks helpers **************/
/**
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ extern "C" {

typedef _z_qos_t _z_n_qos_t;

#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4)

static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) {
_z_n_qos_t ret;
bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
Expand Down
2 changes: 0 additions & 2 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,6 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub)
}
#endif

#ifdef Z_FEATURE_UNSTABLE_API
#if Z_FEATURE_BATCHING == 1
z_result_t zp_batch_start(const z_loaned_session_t *zs) {
if (_Z_RC_IS_NULL(zs)) {
Expand Down Expand Up @@ -1519,7 +1518,6 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) {
return _z_send_n_batch(session, Z_CONGESTION_CONTROL_DEFAULT);
}
#endif
#endif

/**************** Tasks ****************/
void zp_task_read_options_default(zp_task_read_options_t *options) {
Expand Down
60 changes: 45 additions & 15 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@

/*------------------ Transmission helper ------------------*/

static bool _z_transport_tx_get_express_status(const _z_network_message_t *msg) {
switch (msg->_tag) {
case _Z_N_DECLARE:
return _Z_HAS_FLAG(msg->_body._declare._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_PUSH:
return _Z_HAS_FLAG(msg->_body._push._qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_REQUEST:
return _Z_HAS_FLAG(msg->_body._request._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
case _Z_N_RESPONSE:
return _Z_HAS_FLAG(msg->_body._response._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
default:
return false;
}
}
static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) {
_z_zint_t sn;
if (reliability == Z_RELIABILITY_RELIABLE) {
Expand Down Expand Up @@ -139,13 +153,19 @@ static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, con
_z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability);
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg));
// Retry encode
bool is_express = _z_transport_tx_get_express_status(n_msg);
z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg);
if (ret != _Z_RES_OK) {
// Message still doesn't fit in buffer, send as fragments
return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn);
} else {
// Increment batch
ztc->_batch_count++;
if (is_express) {
// Send immediately
return _z_transport_tx_flush_buffer(ztc);
} else {
// Increment batch
ztc->_batch_count++;
}
}
return _Z_RES_OK;
#else
Expand Down Expand Up @@ -180,10 +200,16 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c
}
// Try encoding the network message
size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf);
bool is_express = _z_transport_tx_get_express_status(n_msg);
z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg);
if (ret == _Z_RES_OK) {
// Flush buffer or increase batch
return _z_transport_tx_flush_or_incr_batch(ztc);
if (is_express) {
// Send immediately
return _z_transport_tx_flush_buffer(ztc);
} else {
// Flush buffer or increase batch
return _z_transport_tx_flush_or_incr_batch(ztc);
}
} else if (!batch_has_data) {
// Message doesn't fit in buffer, send as fragments
return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn);
Expand All @@ -193,22 +219,26 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c
}
}

static z_result_t _z_transport_tx_send_t_msg_inner(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) {
// Send batch if needed
bool batch_has_data = _z_transport_tx_batch_has_data(ztc);
if (batch_has_data) {
_Z_RETURN_IF_ERR(_z_transport_tx_flush_buffer(ztc));
}
// Encode transport message
__unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, t_msg));
// Send message
return _z_transport_tx_flush_buffer(ztc);
}

z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) {
z_result_t ret = _Z_RES_OK;
_Z_DEBUG("Send session message");
_z_transport_tx_mutex_lock(ztc, true);

// Encode transport message
__unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
ret = _z_transport_message_encode(&ztc->_wbuf, t_msg);
if (ret == _Z_RES_OK) {
// Send message
__unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow);
ret = _z_link_send_wbuf(&ztc->_link, &ztc->_wbuf);
if (ret == _Z_RES_OK) {
ztc->_transmitted = true; // Tell session we transmitted data
}
}
ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg);

_z_transport_tx_mutex_unlock(ztc);
return ret;
}
Expand Down
Loading