Skip to content

Commit

Permalink
ports: fix TCP client disconnect notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed May 21, 2024
1 parent 4dd98e5 commit c0ea5d3
Show file tree
Hide file tree
Showing 26 changed files with 174 additions and 106 deletions.
14 changes: 1 addition & 13 deletions api/cloud/oc_cloud.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,7 @@ void
oc_cloud_close_endpoint(const oc_endpoint_t *ep)
{
OC_CLOUD_DBG("oc_cloud_close_endpoint");
#ifdef OC_SECURITY
const oc_tls_peer_t *peer = oc_tls_get_peer(ep);
if (peer != NULL) {
OC_CLOUD_DBG("oc_cloud_close_endpoint: oc_tls_close_connection");
oc_tls_close_connection(ep);
} else
#endif /* OC_SECURITY */
{
#ifdef OC_TCP
OC_CLOUD_DBG("oc_cloud_close_endpoint: oc_connectivity_end_session");
oc_connectivity_end_session(ep);
#endif /* OC_TCP */
}
oc_close_session(ep);
}

void
Expand Down
24 changes: 17 additions & 7 deletions api/oc_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@

#include "oc_config.h"

#include "oc_api.h"
#include "oc_endpoint.h"

#ifdef OC_TCP
#include "api/oc_session_events_internal.h"
#include "port/oc_connectivity.h"
#endif /* OC_TCP */

#ifdef OC_SECURITY
#include "security/oc_tls_internal.h"
#endif /* OC_SECURITY */

#ifdef OC_CLIENT

#include "api/client/oc_client_cb_internal.h"
Expand All @@ -38,10 +50,6 @@
#include "messaging/coap/signal_internal.h"
#endif /* OC_TCP */

#ifdef OC_SECURITY
#include "security/oc_tls_internal.h"
#endif /* OC_SECURITY */

#include <assert.h>

typedef struct oc_dispatch_context_t
Expand Down Expand Up @@ -948,6 +956,8 @@ oc_do_ip_discovery_at_endpoint(const char *rt, oc_discovery_handler_t handler,
return status;
}

#endif /* OC_CLIENT */

void
oc_close_session(const oc_endpoint_t *endpoint)
{
Expand All @@ -957,9 +967,9 @@ oc_close_session(const oc_endpoint_t *endpoint)
#endif /* OC_SECURITY */
} else if (endpoint->flags & TCP) {
#ifdef OC_TCP
oc_connectivity_end_session(endpoint);
if (oc_connectivity_end_session_v1(endpoint, false)) {
oc_handle_session(endpoint, OC_SESSION_DISCONNECTED);
}
#endif /* OC_TCP */
}
}

#endif /* OC_CLIENT */
2 changes: 1 addition & 1 deletion api/oc_session_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ handle_session_disconnected(const oc_endpoint_t *endpoint)
(void)endpoint;
#ifdef OC_SECURITY
if ((endpoint->flags & SECURED) != 0 && (endpoint->flags & TCP) != 0) {
oc_tls_remove_peer(endpoint);
oc_tls_remove_peer(endpoint, false);
}
#endif /* OC_SECURITY */
#ifdef OC_SERVER
Expand Down
2 changes: 1 addition & 1 deletion api/plgd/plgd_time.c
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ plgd_time_fetch(plgd_time_fetch_config_t fetch, unsigned *flags)
OC_ERR("failed to send fetch plgd-time request to endpoint");
#if defined(OC_SECURITY) && defined(OC_PKI)
if (add_insecure_peer) {
oc_tls_remove_peer(fetch.endpoint);
oc_tls_remove_peer(fetch.endpoint, true);
}
#endif /* OC_SECURITY && OC_PKI */
oc_memb_free(&g_fetch_params_s, fetch_params);
Expand Down
2 changes: 1 addition & 1 deletion apps/client_multithread_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pong_received_handler(oc_client_response_t *data)
ping_count++;
if (ping_count > PING_RETRY_COUNT) {
OC_PRINTF("retry over. close connection.\n");
oc_connectivity_end_session(data->endpoint);
oc_close_session(data->endpoint);
} else {
ping_timeout <<= 1;
OC_PRINTF("PING send again.[retry: %zd, time: %u]\n", ping_count,
Expand Down
2 changes: 1 addition & 1 deletion messaging/coap/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ coap_process_invalid_inbound_message(const coap_packet_t *packet,
#endif /* OC_SECURITY */
#ifdef OC_TCP
if ((msg->endpoint.flags & TCP) != 0) {
oc_connectivity_end_session(&msg->endpoint);
oc_close_session(&msg->endpoint);
return;
}
#endif /* OC_TCP */
Expand Down
3 changes: 2 additions & 1 deletion messaging/coap/signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "log_internal.h"
#include "signal_internal.h"
#include "coap_internal.h"
#include "oc_api.h"
#include "transactions_internal.h"
#include <string.h>

Expand Down Expand Up @@ -223,7 +224,7 @@ coap_signal_handle_message(const coap_packet_t *packet,
if (packet->code == RELEASE_7_04) {
// alternative address
// hold off
oc_connectivity_end_session(endpoint);
oc_close_session(endpoint);
return COAP_SIGNAL_DONE;
}

Expand Down
10 changes: 9 additions & 1 deletion port/android/ipadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1629,13 +1629,21 @@ oc_connectivity_shutdown(size_t device)
#ifdef OC_TCP
void
oc_connectivity_end_session(const oc_endpoint_t *endpoint)
{
oc_connectivity_end_session_v1(endpoint, true);
}

bool
oc_connectivity_end_session_v1(const oc_endpoint_t *endpoint,
bool notify_session_end)
{
if (endpoint->flags & TCP) {
ip_context_t *dev = get_ip_context_for_device(endpoint->device);
if (dev) {
oc_tcp_end_session(dev, endpoint);
return oc_tcp_end_session(dev, endpoint, notify_session_end);
}
}
return false;
}
#endif /* OC_TCP */

Expand Down
24 changes: 13 additions & 11 deletions port/android/tcpadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ oc_tcp_add_socks_to_fd_set(ip_context_t *dev)
}

static void
free_tcp_session(tcp_session_t *session)
free_tcp_session(tcp_session_t *session, bool notify_session_end)
{
oc_list_remove(session_list, session);

if (!oc_session_events_disconnect_is_ongoing()) {
if (!oc_session_events_disconnect_is_ongoing() && notify_session_end) {
oc_session_end_event(&session->endpoint);
}

Expand Down Expand Up @@ -401,13 +401,13 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
if (count < 0) {
OC_ERR("recv error! %d", errno);

free_tcp_session(session);
free_tcp_session(session, true);

ret_with_code(ADAPTER_STATUS_ERROR);
} else if (count == 0) {
OC_DBG("peer closed TCP session\n");

free_tcp_session(session);
free_tcp_session(session, true);

ret_with_code(ADAPTER_STATUS_NONE);
}
Expand All @@ -428,7 +428,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
oc_tcp_get_total_length_from_message_header(message);
if (length_from_header < 0) {
OC_ERR("invalid message size in header");
free_tcp_session(session);
free_tcp_session(session, true);
ret_with_code(ADAPTER_STATUS_ERROR);
}

Expand All @@ -438,7 +438,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
OC_ERR(
"total receive length(%zu) is bigger than message buffer size(%zu)",
total_length, oc_message_buffer_size(message));
free_tcp_session(session);
free_tcp_session(session, true);
ret_with_code(ADAPTER_STATUS_ERROR);
}
OC_DBG("tcp packet total length : %zu bytes.", total_length);
Expand All @@ -448,7 +448,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
} while (total_length > message->length);

if (!oc_tcp_is_valid_message(message)) {
free_tcp_session(session);
free_tcp_session(session, true);
return ADAPTER_STATUS_ERROR;
}

Expand All @@ -461,15 +461,17 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
return ret;
}

void
oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint)
bool
oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint,
bool notify_session_end)
{
pthread_mutex_lock(&dev->tcp.mutex);
tcp_session_t *session = find_session_by_endpoint(endpoint);
if (session) {
free_tcp_session(session);
free_tcp_session(session, notify_session_end);
}
pthread_mutex_unlock(&dev->tcp.mutex);
return session != NULL;
}

static int
Expand Down Expand Up @@ -764,7 +766,7 @@ oc_tcp_connectivity_shutdown(ip_context_t *dev)
while (session != NULL) {
next = session->next;
if (session->endpoint.device == dev->device) {
free_tcp_session(session);
free_tcp_session(session, true);
}
session = next;
}
Expand Down
3 changes: 2 additions & 1 deletion port/android/tcpadapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void oc_tcp_add_socks_to_fd_set(ip_context_t *dev);
adapter_receive_state_t oc_tcp_receive_message(ip_context_t *dev, fd_set *fds,
oc_message_t *message);

void oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint);
bool oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint,
bool notify_session_end);

#ifdef __cplusplus
}
Expand Down
10 changes: 9 additions & 1 deletion port/esp32/adapter/src/ipadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1617,13 +1617,21 @@ oc_connectivity_shutdown(size_t device)
#ifdef OC_TCP
void
oc_connectivity_end_session(const oc_endpoint_t *endpoint)
{
oc_connectivity_end_session_v1(endpoint, true);
}

bool
oc_connectivity_end_session_v1(const oc_endpoint_t *endpoint,
bool notify_session_end)
{
if (endpoint->flags & TCP) {
ip_context_t *dev = get_ip_context_for_device(endpoint->device);
if (dev) {
oc_tcp_end_session(dev, endpoint);
return oc_tcp_end_session(dev, endpoint, notify_session_end);
}
}
return false;
}
#endif /* OC_TCP */

Expand Down
24 changes: 13 additions & 11 deletions port/esp32/adapter/src/tcpadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ oc_tcp_add_socks_to_fd_set(ip_context_t *dev)
}

static void
free_tcp_session(tcp_session_t *session)
free_tcp_session(tcp_session_t *session, bool notify_session_end)
{
oc_list_remove(session_list, session);

if (!oc_session_events_disconnect_is_ongoing()) {
if (!oc_session_events_disconnect_is_ongoing() && notify_session_end) {
oc_session_end_event(&session->endpoint);
}

Expand Down Expand Up @@ -374,13 +374,13 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
if (count < 0) {
OC_ERR("recv error! %d", errno);

free_tcp_session(session);
free_tcp_session(session, true);

ret_with_code(ADAPTER_STATUS_ERROR);
} else if (count == 0) {
OC_DBG("peer closed TCP session\n");

free_tcp_session(session);
free_tcp_session(session, true);

ret_with_code(ADAPTER_STATUS_NONE);
}
Expand All @@ -401,7 +401,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
oc_tcp_get_total_length_from_message_header(message);
if (length_from_header < 0) {
OC_ERR("invalid message size in header");
free_tcp_session(session);
free_tcp_session(session, true);
ret_with_code(ADAPTER_STATUS_ERROR);
}

Expand All @@ -411,7 +411,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
OC_ERR(
"total receive length(%zu) is bigger than message buffer size(%zu)",
total_length, oc_message_buffer_size(message));
free_tcp_session(session);
free_tcp_session(session, true);
ret_with_code(ADAPTER_STATUS_ERROR);
}
OC_DBG("tcp packet total length : %zu bytes.", total_length);
Expand All @@ -421,7 +421,7 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
} while (total_length > message->length);

if (!oc_tcp_is_valid_message(message)) {
free_tcp_session(session);
free_tcp_session(session, true);
ret_with_code(ADAPTER_STATUS_ERROR);
}

Expand All @@ -434,15 +434,17 @@ oc_tcp_receive_message(ip_context_t *dev, fd_set *fds, oc_message_t *message)
return ret;
}

void
oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint)
bool
oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint,
bool notify_session_end)
{
pthread_mutex_lock(&dev->tcp.mutex);
tcp_session_t *session = find_session_by_endpoint(endpoint);
if (session) {
free_tcp_session(session);
free_tcp_session(session, notify_session_end);
}
pthread_mutex_unlock(&dev->tcp.mutex);
return session != NULL;
}

static int
Expand Down Expand Up @@ -724,7 +726,7 @@ oc_tcp_connectivity_shutdown(ip_context_t *dev)
while (session != NULL) {
next = session->next;
if (session->endpoint.device == dev->device) {
free_tcp_session(session);
free_tcp_session(session, true);
}
session = next;
}
Expand Down
3 changes: 2 additions & 1 deletion port/esp32/adapter/src/tcpadapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void oc_tcp_add_socks_to_fd_set(ip_context_t *dev);
adapter_receive_state_t oc_tcp_receive_message(ip_context_t *dev, fd_set *fds,
oc_message_t *message);

void oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint);
bool oc_tcp_end_session(ip_context_t *dev, const oc_endpoint_t *endpoint,
bool notify_session_end);

#ifdef __cplusplus
}
Expand Down
14 changes: 11 additions & 3 deletions port/linux/ipadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1577,12 +1577,20 @@ oc_connectivity_shutdown(size_t device)
}

#ifdef OC_TCP
void
oc_connectivity_end_session(const oc_endpoint_t *endpoint)
bool
oc_connectivity_end_session_v1(const oc_endpoint_t *endpoint,
bool notify_session_end)
{
if ((endpoint->flags & TCP) != 0 &&
oc_get_ip_context_for_device(endpoint->device) != NULL) {
tcp_end_session(endpoint);
return tcp_end_session(endpoint, notify_session_end);
}
return false;
}

void
oc_connectivity_end_session(const oc_endpoint_t *endpoint)
{
oc_connectivity_end_session_v1(endpoint, true);
}
#endif /* OC_TCP */
Loading

0 comments on commit c0ea5d3

Please sign in to comment.