Skip to content

Commit

Permalink
feat: return is express val when encoding network message
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Jan 6, 2025
1 parent 860099f commit dc20d6c
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
10 changes: 5 additions & 5 deletions include/zenoh-pico/protocol/codec/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@
extern "C" {
#endif

z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg);
z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express);
z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs);
z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg);
z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express);
z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs);
z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg);
z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express);
z_result_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs);
z_result_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t *msg);
z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header);
z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl);
z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express);
z_result_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header);
z_result_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest);
z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header);

z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg);
z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express);
z_result_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_t *arcs);

#ifdef __cplusplus
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ extern "C" {

typedef _z_qos_t _z_n_qos_t;

#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4)

static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) {
_z_n_qos_t ret;
bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
Expand Down
30 changes: 21 additions & 9 deletions src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@

/*------------------ Push Message ------------------*/

z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg) {
z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express) {
uint8_t header = _Z_MID_N_PUSH | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0);
bool has_suffix = _z_keyexpr_has_suffix(&msg->_key);
bool has_qos_ext = msg->_qos._val != _Z_N_QOS_DEFAULT._val;
bool has_timestamp_ext = _z_timestamp_check(&msg->_timestamp);
if (is_express != NULL) {
*is_express = _Z_HAS_FLAG(msg->_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
}
if (has_suffix) {
header |= _Z_FLAG_N_REQUEST_N;
}
Expand Down Expand Up @@ -111,7 +114,7 @@ z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header,
}

/*------------------ Request Message ------------------*/
z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) {
z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express) {
z_result_t ret = _Z_RES_OK;
uint8_t header = _Z_MID_N_REQUEST | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0);
bool has_suffix = _z_keyexpr_has_suffix(&msg->_key);
Expand All @@ -128,6 +131,9 @@ z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) {
uint8_t extheader = 0x01 | _Z_MSG_EXT_ENC_ZINT | (exts.n ? _Z_FLAG_Z_Z : 0);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_ext_qos._val));
if (is_express != NULL) {
*is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
}
}
if (exts.ext_tstamp) {
exts.n -= 1;
Expand Down Expand Up @@ -238,7 +244,7 @@ z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint
}

/*------------------ Response Message ------------------*/
z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express) {
z_result_t ret = _Z_RES_OK;
uint8_t header = _Z_MID_N_RESPONSE;
_Z_DEBUG("Encoding _Z_MID_N_RESPONSE");
Expand All @@ -247,6 +253,9 @@ z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
bool has_responder_ext = _z_id_check(msg->_ext_responder._zid) || msg->_ext_responder._eid != 0;
int n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0);
bool has_suffix = _z_keyexpr_has_suffix(&msg->_key);
if (is_express != NULL) {
*is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
}
if (_z_keyexpr_is_local(&msg->_key)) {
_Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_M);
}
Expand Down Expand Up @@ -389,10 +398,13 @@ z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *z
return ret;
}

z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl) {
z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express) {
uint8_t header = _Z_MID_N_DECLARE;
bool has_qos_ext = decl->_ext_qos._val != _Z_N_QOS_DEFAULT._val;
bool has_timestamp_ext = _z_timestamp_check(&decl->_ext_timestamp);
if (is_express != NULL) {
*is_express = _Z_HAS_FLAG(decl->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG);
}
int n_ext = (has_qos_ext ? 1 : 0) + (has_timestamp_ext ? 1 : 0);
if (n_ext != 0) {
header |= _Z_FLAG_N_Z;
Expand Down Expand Up @@ -489,19 +501,19 @@ z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, u
return _z_interest_decode(&interest->_interest, zbf, is_final, has_ext);
}

z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg) {
z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express) {
switch (msg->_tag) {
case _Z_N_DECLARE: {
return _z_declare_encode(wbf, &msg->_body._declare);
return _z_declare_encode(wbf, &msg->_body._declare, is_express);
} break;
case _Z_N_PUSH: {
return _z_push_encode(wbf, &msg->_body._push);
return _z_push_encode(wbf, &msg->_body._push, is_express);
} break;
case _Z_N_REQUEST: {
return _z_request_encode(wbf, &msg->_body._request);
return _z_request_encode(wbf, &msg->_body._request, is_express);
} break;
case _Z_N_RESPONSE: {
return _z_response_encode(wbf, &msg->_body._response);
return _z_response_encode(wbf, &msg->_body._response, is_express);
} break;
case _Z_N_RESPONSE_FINAL: {
return _z_response_final_encode(wbf, &msg->_body._response_final);
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_
if (ret == _Z_RES_OK) {
size_t len = _z_network_message_svec_len(&msg->_messages);
for (size_t i = 0; i < len; i++) {
_Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i)))
_Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i), NULL))
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_common._wbuf, &t_msg),
_z_transport_tx_mutex_unlock(&ztm->_common));
// Encode the network message
if (_z_network_message_encode(&ztm->_common._wbuf, n_msg) == _Z_RES_OK) {
if (_z_network_message_encode(&ztm->_common._wbuf, n_msg, NULL) == _Z_RES_OK) {
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf),
_z_transport_tx_mutex_unlock(&ztm->_common));
Expand All @@ -266,7 +266,8 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
// Encode the message on the expandable wbuf
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common));
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common),
NULL);
// Fragment and send the message
bool is_first = true;
while (_z_wbuf_len(&fbf) > 0) {
Expand Down
8 changes: 4 additions & 4 deletions tests/z_msgcodec_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ void declare_message(void) {
_z_network_message_t n_msg = gen_declare_message();

// Encode
z_result_t res = _z_network_message_encode(&wbf, &n_msg);
z_result_t res = _z_network_message_encode(&wbf, &n_msg, NULL);
assert(res == _Z_RES_OK);
(void)(res);

Expand Down Expand Up @@ -1433,7 +1433,7 @@ void push_message(void) {
printf("\n>> Push message\n");
_z_wbuf_t wbf = gen_wbuf(UINT16_MAX);
_z_n_msg_push_t expected = gen_push();
assert(_z_push_encode(&wbf, &expected) == _Z_RES_OK);
assert(_z_push_encode(&wbf, &expected, NULL) == _Z_RES_OK);
_z_n_msg_push_t decoded = {0};
_z_arc_slice_t arcs = {0};
_z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf);
Expand Down Expand Up @@ -1505,7 +1505,7 @@ void request_message(void) {
printf("\n>> Request message\n");
_z_wbuf_t wbf = gen_wbuf(UINT16_MAX);
_z_n_msg_request_t expected = gen_request();
assert(_z_request_encode(&wbf, &expected) == _Z_RES_OK);
assert(_z_request_encode(&wbf, &expected, NULL) == _Z_RES_OK);
_z_n_msg_request_t decoded = {0};
_z_arc_slice_t arcs = {0};
_z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf);
Expand Down Expand Up @@ -1564,7 +1564,7 @@ void response_message(void) {
printf("\n>> Response message\n");
_z_wbuf_t wbf = gen_wbuf(UINT16_MAX);
_z_n_msg_response_t expected = gen_response();
assert(_z_response_encode(&wbf, &expected) == _Z_RES_OK);
assert(_z_response_encode(&wbf, &expected, NULL) == _Z_RES_OK);
_z_n_msg_response_t decoded = {0};
_z_arc_slice_t arcs = {0};
_z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf);
Expand Down

0 comments on commit dc20d6c

Please sign in to comment.