Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch reliability from subscriber to publisher. #630

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ Primitives
.. autocfunction:: primitives.h::z_sample_timestamp
.. autocfunction:: primitives.h::z_sample_encoding
.. autocfunction:: primitives.h::z_sample_kind
.. autocfunction:: primitives.h::z_sample_reliability
.. autocfunction:: primitives.h::z_sample_attachment
.. autocfunction:: primitives.h::z_put_options_default
.. autocfunction:: primitives.h::z_delete_options_default
Expand Down
11 changes: 11 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,17 @@ const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample);
*/
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);

/**
* Gets the reliability a sample was received with.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the reliability from.
*
* Return:
* The reliability wrapped as a :c:type:`z_reliability_t`.
*/
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample);

/**
* Got sample qos congestion control value.
*
Expand Down
11 changes: 4 additions & 7 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,10 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)

/**
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
*
* Members:
* (unstable) z_reliability_t reliability: The subscription reliability value.
*.
*/
typedef struct {
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#else
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
#endif
} z_subscriber_options_t;

/**
Expand Down Expand Up @@ -218,6 +212,7 @@ typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_reliability_t reliability;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reliability is not under Z_FEATURE_UNSTABLE_API anymore ?

} z_publisher_options_t;

/**
Expand Down Expand Up @@ -296,6 +291,7 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_moved_bytes_t *attachment;
z_reliability_t reliability;
} z_put_options_t;

/**
Expand All @@ -312,6 +308,7 @@ typedef struct {
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
z_reliability_t reliability;
} z_delete_options_t;

/**
Expand Down
14 changes: 8 additions & 6 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid);
* keyexpr: The resource key to publish. The callee gets the ownership
* of any allocated value.
* encoding: The optional default encoding to use during put. The callee gets the ownership.
* reliability: The reliability of the publisher messages
*
* Returns:
* The created :c:type:`_z_publisher_t` (in null state if the declaration failed)..
*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express);
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express,
z_reliability_t reliability);

/**
* Undeclare a :c:type:`_z_publisher_t`.
Expand Down Expand Up @@ -118,12 +120,14 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* reliability: The message reliability.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment,
z_reliability_t reliability);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand All @@ -134,16 +138,14 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership
* of any allocated value.
* sub_info: The :c:type:`_z_subinfo_t` to configure the :c:type:`_z_subscriber_t`.
* The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a data matching the subscribed resource is
* received. arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg);
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_data_handler_t callback,
_z_drop_handler_t dropper, void *arg);

/**
* Undeclare a :c:type:`_z_subscriber_t`.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef struct _z_publisher_t {
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
_Bool _is_express;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct _z_sample_t {
z_sample_kind_t kind;
_z_qos_t qos;
_z_bytes_t attachment;
z_reliability_t reliability;
} _z_sample_t;
void _z_sample_clear(_z_sample_t *sample);

Expand All @@ -56,6 +57,6 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
7 changes: 0 additions & 7 deletions include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ typedef struct {
} _z_subscriber_t;

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Create a default subscription info for a push subscriber.
*
* Returns:
* A :c:type:`_z_subinfo_t` containing the created subscription info.
*/
_z_subinfo_t _z_subinfo_default(void);

void _z_subscriber_clear(_z_subscriber_t *sub);
void _z_subscriber_free(_z_subscriber_t **sub);
Expand Down
11 changes: 0 additions & 11 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,6 @@ typedef struct {
_z_zint_t n;
} _z_target_complete_body_t;

/**
* Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created
* :c:type:`_z_subscription_rc_t`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct {
z_reliability_t reliability;
} _z_subinfo_t;

typedef struct {
_z_id_t _id;
uint32_t _entity_id;
Expand Down
5 changes: 1 addition & 4 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ _z_undecl_kexpr_t _z_undecl_kexpr_null(void);
typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
struct {
_Bool _reliable;
} _ext_subinfo;
} _z_decl_subscriber_t;
_z_decl_subscriber_t _z_decl_subscriber_null(void);
typedef struct {
Expand Down Expand Up @@ -105,7 +102,7 @@ void _z_decl_fix_mapping(_z_declaration_t* msg, uint16_t mapping);
_z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key);
_z_declaration_t _z_make_undecl_keyexpr(uint16_t id);

_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable);
_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ typedef union {
typedef struct {
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag;
_z_network_body_t _body;
z_reliability_t _reliability;
} _z_network_message_t;
typedef _z_network_message_t _z_zenoh_message_t;
void _z_n_msg_clear(_z_network_message_t *m);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ typedef struct {
} _z_transport_message_t;
void _z_t_msg_clear(_z_transport_message_t *msg);

z_reliability_t _z_t_msg_get_reliability(_z_transport_message_t *msg);

/*------------------ Builders ------------------*/
_z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _z_id_t zid,
_z_conduit_sn_list_t next_sn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/push.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
#ifndef ZENOH_PICO_SESSION_PUSH_H
#define ZENOH_PICO_SESSION_PUSH_H

int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push);
int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability);

#endif /* ZENOH_PICO_SESSION_PUSH_H */
1 change: 0 additions & 1 deletion include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ typedef struct {
_z_data_handler_t _callback;
_z_drop_handler_t _dropper;
void *_arg;
_z_subinfo_t _info;
} _z_subscription_t;

_Bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
Expand All @@ -30,7 +30,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t
_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
#endif
Expand Down
45 changes: 20 additions & 25 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,7 @@ z_result_t z_id_to_string(z_id_t *id, z_owned_string_t *str) {

const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; }
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample) { return sample->reliability; }
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; }
const z_timestamp_t *z_sample_timestamp(const z_loaned_sample_t *sample) {
if (_z_timestamp_check(&sample->timestamp)) {
Expand Down Expand Up @@ -1046,13 +1047,15 @@ void z_put_options_default(z_put_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
options->attachment = NULL;
options->reliability = Z_RELIABILITY_DEFAULT;
}

void z_delete_options_default(z_delete_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->is_express = false;
options->timestamp = NULL;
options->priority = Z_PRIORITY_DEFAULT;
options->reliability = Z_RELIABILITY_DEFAULT;
}

int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload,
Expand All @@ -1068,19 +1071,21 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
opt.reliability = options->reliability;
}

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this),
opt.encoding == NULL ? NULL : &opt.encoding->_this._val, Z_SAMPLE_KIND_PUT, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this));
opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this),
opt.reliability);

// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this),
opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
_z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this));
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), opt.reliability);
// Clean-up
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1098,9 +1103,10 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.reliability = options->reliability;
}
ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null(), opt.reliability);

return ret;
}
Expand All @@ -1110,6 +1116,7 @@ void z_publisher_options_default(z_publisher_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
options->reliability = Z_RELIABILITY_DEFAULT;
}

int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
Expand All @@ -1135,8 +1142,9 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
opt = *options;
}
// Set publisher
_z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
opt.congestion_control, opt.priority, opt.is_express);
_z_publisher_t int_pub =
_z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.congestion_control,
opt.priority, opt.is_express, opt.reliability);
// Create write filter
int8_t res = _z_write_filter_create(&int_pub);
if (res != _Z_RES_OK) {
Expand Down Expand Up @@ -1184,13 +1192,13 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(&opt.attachment->_this));
_z_bytes_from_owned_bytes(&opt.attachment->_this), pub->reliability);
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this));
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), pub->reliability);
// Clean-up
_z_encoding_clear(&encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1209,7 +1217,8 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null());
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(),
pub->reliability);
}

const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
Expand Down Expand Up @@ -1530,16 +1539,11 @@ void _z_subscriber_drop(_z_subscriber_t *sub) { _z_undeclare_and_clear_subscribe
_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_subscriber_t, subscriber, _z_subscriber_check, _z_subscriber_null,
_z_subscriber_drop)

void z_subscriber_options_default(z_subscriber_options_t *options) {
#if Z_FEATURE_UNSTABLE_API == 1
options->reliability = Z_RELIABILITY_DEFAULT;
#else
options->__dummy = 0;
#endif
}
void z_subscriber_options_default(z_subscriber_options_t *options) { options->__dummy = 0; }

int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
z_moved_closure_sample_t *callback, const z_subscriber_options_t *options) {
_ZP_UNUSED(options);
void *ctx = callback->_this._val.context;
callback->_this._val.context = NULL;

Expand Down Expand Up @@ -1575,16 +1579,7 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t
}
}

_z_subinfo_t subinfo = _z_subinfo_default();
if (options != NULL) {
#if Z_FEATURE_UNSTABLE_API == 1
subinfo.reliability = options->reliability;
#else
subinfo.reliability = Z_RELIABILITY_DEFAULT;
#endif
}
_z_subscriber_t int_sub =
_z_declare_subscriber(zs, key, subinfo, callback->_this._val.call, callback->_this._val.drop, ctx);
_z_subscriber_t int_sub = _z_declare_subscriber(zs, key, callback->_this._val.call, callback->_this._val.drop, ctx);

z_internal_closure_sample_null(&callback->_this);
sub->_val = int_sub;
Expand Down
Loading
Loading