From ff1c658592a1ee6aa5bbc0b9f0470ae94a4b9a77 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Sat, 1 Jun 2024 15:58:45 +0200 Subject: [PATCH] fixup! tcp: pair connection with the tls peer --- port/android/tcpadapter.c | 38 ++++++++++++++---- port/esp32/adapter/src/tcpadapter.c | 38 ++++++++++++++---- port/linux/tcpsession.c | 60 +++++++++++++++++++++++++---- port/oc_connectivity_internal.h | 11 ++++++ port/unittest/connectivitytest.cpp | 37 +++++++++++++++++- port/windows/tcpadapter.c | 42 ++++++++++++++++---- 6 files changed, 197 insertions(+), 29 deletions(-) diff --git a/port/android/tcpadapter.c b/port/android/tcpadapter.c index c4d805ef9..ccb81ec61 100644 --- a/port/android/tcpadapter.c +++ b/port/android/tcpadapter.c @@ -28,6 +28,7 @@ #include "oc_endpoint.h" #include "oc_session_events.h" #include "port/oc_assert.h" +#include "port/oc_connectivity_internal.h" #include "port/oc_log_internal.h" #include "port/oc_tcp_socket_internal.h" #include "tcpadapter.h" @@ -228,18 +229,16 @@ add_new_session(int sock, ip_context_t *dev, oc_endpoint_t *endpoint, return -1; } - endpoint->interface_index = (unsigned)if_index; - session->dev = dev; + endpoint->interface_index = (unsigned)if_index; + if (session_id == 0) { + session_id = oc_tcp_get_new_session_id(); + } + endpoint->session_id = session_id; memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t)); session->endpoint.next = NULL; session->sock = sock; session->csm_state = state; - if (session_id == 0) { - session->endpoint.session_id = oc_tcp_get_new_session_id(); - } else { - session->endpoint.session_id = session_id; - } oc_list_add(session_list, session); @@ -315,6 +314,22 @@ find_session_by_endpoint(const oc_endpoint_t *endpoint) return session; } +static tcp_session_t * +find_session_by_id(uint32_t session_id) +{ + tcp_session_t *session = oc_list_head(session_list); + while (session != NULL && session->endpoint.session_id != session_id) { + session = session->next; + } + + if (!session) { + OC_DBG("could not find ongoing TCP session for session id %d", session_id); + return NULL; + } + OC_DBG("found TCP session for session id %d", session_id); + return session; +} + static tcp_session_t * get_ready_to_read_session(fd_set *setfds) { @@ -796,6 +811,15 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint) return -1; } +int +oc_tcp_session_state(uint32_t session_id) +{ + if (find_session_by_id(session_id) != NULL) { + return OC_TCP_SOCKET_STATE_CONNECTED; + } + return -1; +} + tcp_csm_state_t oc_tcp_get_csm_state(const oc_endpoint_t *endpoint) { diff --git a/port/esp32/adapter/src/tcpadapter.c b/port/esp32/adapter/src/tcpadapter.c index 577d36f04..b07c29e2c 100644 --- a/port/esp32/adapter/src/tcpadapter.c +++ b/port/esp32/adapter/src/tcpadapter.c @@ -28,6 +28,7 @@ #include "oc_endpoint.h" #include "oc_session_events.h" #include "port/oc_assert.h" +#include "port/oc_connectivity_internal.h" #include "port/oc_log_internal.h" #include "port/oc_tcp_socket_internal.h" #include "tcpadapter.h" @@ -198,18 +199,16 @@ add_new_session(int sock, ip_context_t *dev, oc_endpoint_t *endpoint, return -1; } - endpoint->interface_index = (unsigned)if_index; - session->dev = dev; + endpoint->interface_index = (unsigned)if_index; + if (session_id == 0) { + session_id = oc_tcp_get_new_session_id(); + } + endpoint->session_id = session_id; memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t)); session->endpoint.next = NULL; session->sock = sock; session->csm_state = state; - if (session_id == 0) { - session->endpoint.session_id = oc_tcp_get_new_session_id(); - } else { - session->endpoint.session_id = session_id; - } oc_list_add(session_list, session); @@ -286,6 +285,22 @@ find_session_by_endpoint(const oc_endpoint_t *endpoint) return session; } +static tcp_session_t * +find_session_by_id(uint32_t session_id) +{ + tcp_session_t *session = oc_list_head(session_list); + while (session != NULL && session->endpoint.session_id != session_id) { + session = session->next; + } + + if (!session) { + OC_DBG("could not find ongoing TCP session for session id %d", session_id); + return NULL; + } + OC_DBG("found TCP session for session id %d", session_id); + return session; +} + static tcp_session_t * get_ready_to_read_session(fd_set *setfds) { @@ -756,6 +771,15 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint) return -1; } +int +oc_tcp_session_state(uint32_t session_id) +{ + if (find_session_by_id(session_id) != NULL) { + return OC_TCP_SOCKET_STATE_CONNECTED; + } + return -1; +} + tcp_csm_state_t oc_tcp_get_csm_state(const oc_endpoint_t *endpoint) { diff --git a/port/linux/tcpsession.c b/port/linux/tcpsession.c index 5d535f11c..4199e259f 100644 --- a/port/linux/tcpsession.c +++ b/port/linux/tcpsession.c @@ -205,6 +205,7 @@ get_interface_index(int sock) static void log_new_session(oc_endpoint_t *endpoint, int sock, bool is_connected) { + // GCOVR_EXCL_START oc_string64_t ep; const char *addr = ""; if (oc_endpoint_to_string64(endpoint, &ep)) { @@ -214,11 +215,13 @@ log_new_session(oc_endpoint_t *endpoint, int sock, bool is_connected) "connected: %d, session_id: %u", addr, endpoint->interface_index, sock, (int)is_connected, (unsigned)endpoint->session_id); + // GCOVR_EXCL_STOP } static void log_free_session(oc_endpoint_t *endpoint, int sock) { + // GCOVR_EXCL_START oc_string64_t ep; const char *addr = ""; if (oc_endpoint_to_string64(endpoint, &ep)) { @@ -227,6 +230,7 @@ log_free_session(oc_endpoint_t *endpoint, int sock) OC_DBG("free TCP session endpoint: %s, endpoint interface: %d, sock: %d, " "session_id: %u", addr, endpoint->interface_index, sock, (unsigned)endpoint->session_id); + // GCOVR_EXCL_STOP } #endif /* OC_DBG_IS_ENABLED */ @@ -250,16 +254,16 @@ add_new_session_locked(int sock, ip_context_t *dev, oc_endpoint_t *endpoint, session->dev = dev; endpoint->interface_index = (unsigned)if_index; + if (session_id == 0) { + session_id = oc_tcp_get_new_session_id(); + } + endpoint->session_id = session_id; memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t)); session->endpoint.next = NULL; + session->endpoint.interface_index = (unsigned)if_index; session->sock = sock; session->csm_state = state; session->notify_session_end = true; - if (session_id == 0) { - session->endpoint.session_id = oc_tcp_get_new_session_id(); - } else { - session->endpoint.session_id = session_id; - } oc_list_add(g_session_list, session); @@ -304,8 +308,8 @@ accept_new_session_locked(ip_context_t *dev, int fd, fd_set *setfds, #endif /* !OC_IPV4 */ } - if (add_new_session_locked(new_socket, dev, endpoint, /*session_id*/ 0, - CSM_NONE) == NULL) { + if (add_new_session_locked(new_socket, dev, endpoint, + /*session_id*/ 0, CSM_NONE) == NULL) { OC_ERR("could not record new TCP session"); close(new_socket); return -1; @@ -551,6 +555,16 @@ find_session_by_endpoint_locked(const oc_endpoint_t *endpoint) return session; } +static tcp_session_t * +find_session_by_id_locked(uint32_t session_id) +{ + tcp_session_t *session = oc_list_head(g_session_list); + while (session != NULL && session->endpoint.session_id != session_id) { + session = session->next; + } + return session; +} + #ifdef OC_HAS_FEATURE_TCP_ASYNC_CONNECT static tcp_session_t * @@ -586,6 +600,17 @@ find_waiting_session_by_endpoint_locked(const oc_endpoint_t *endpoint) return ws; } +static tcp_waiting_session_t * +find_waiting_session_by_id_locked(uint32_t session_id) +{ + tcp_waiting_session_t *ws = + (tcp_waiting_session_t *)oc_list_head(g_waiting_session_list); + while (ws != NULL && ws->endpoint.session_id != session_id) { + ws = ws->next; + } + return ws; +} + static tcp_waiting_session_t * add_new_waiting_session_locked(int sock, ip_context_t *dev, const oc_endpoint_t *endpoint, @@ -879,6 +904,24 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint) return -1; } +int +oc_tcp_session_state(uint32_t session_id) +{ + pthread_mutex_lock(&g_mutex); + if (find_session_by_id_locked(session_id) != NULL) { + pthread_mutex_unlock(&g_mutex); + return OC_TCP_SOCKET_STATE_CONNECTED; + } +#ifdef OC_HAS_FEATURE_TCP_ASYNC_CONNECT + if (find_waiting_session_by_id_locked(session_id) != NULL) { + pthread_mutex_unlock(&g_mutex); + return OC_TCP_SOCKET_STATE_CONNECTING; + } +#endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */ + pthread_mutex_unlock(&g_mutex); + return -1; +} + static int tcp_send_message(int sockfd, const oc_message_t *message) { @@ -1370,6 +1413,9 @@ oc_tcp_connect(oc_endpoint_t *endpoint, on_tcp_connect_t on_tcp_connect, { oc_tcp_connect_result_t ret = oc_tcp_connect_to_endpoint(endpoint, on_tcp_connect, on_tcp_connect_data); + if (ret.session_id != 0) { + endpoint->session_id = ret.session_id; + } return ret.error != 0 ? ret.error : (int)ret.state; } diff --git a/port/oc_connectivity_internal.h b/port/oc_connectivity_internal.h index 243848984..413cab1b0 100644 --- a/port/oc_connectivity_internal.h +++ b/port/oc_connectivity_internal.h @@ -130,6 +130,17 @@ void oc_tcp_set_connect_retry(uint8_t max_count, uint16_t timeout); #endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */ +/** + * @brief Get state of TCP connection for given session id. + * + * @param session_id session id + * @return OC_TCP_SOCKET_STATE_CONNECTED TCP connection exists and it is ongoing + * @return OC_TCP_SOCKET_STATE_CONNECTING TCP connection is waiting to be + * established + * @return -1 otherwise + */ +int oc_tcp_session_state(uint32_t session_id); + #endif /* OC_TCP */ #ifdef __cplusplus diff --git a/port/unittest/connectivitytest.cpp b/port/unittest/connectivitytest.cpp index f5a747cee..52b7b3b1d 100644 --- a/port/unittest/connectivitytest.cpp +++ b/port/unittest/connectivitytest.cpp @@ -42,6 +42,8 @@ #include #include +using namespace std::chrono_literals; + static constexpr size_t kDeviceID = 0; class TestConnectivity : public testing::Test { @@ -462,11 +464,14 @@ TEST_F(TestConnectivityWithServer, oc_tcp_update_csm_state_P) coap_serialize_message(&packet, msg->data, oc_message_buffer_size(msg)); oc_send_buffer(msg); + ep.session_id = msg->endpoint.session_id; oc_message_unref(msg); #endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */ #ifdef OC_TCP EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTED, oc_tcp_connection_state(&ep)); + ASSERT_NE(0, ep.session_id); + EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTED, oc_tcp_session_state(ep.session_id)); #endif /* OC_TCP */ EXPECT_EQ(0, oc_tcp_update_csm_state(&ep, CSM_DONE)); @@ -526,7 +531,7 @@ TEST_F(TestConnectivityWithServer, oc_tcp_connect_timeout) return; } - EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret); + ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret); oc_message_t *msg = oc_allocate_message(); memcpy(&msg->endpoint, &ep, sizeof(oc_endpoint_t)); @@ -544,6 +549,36 @@ TEST_F(TestConnectivityWithServer, oc_tcp_connect_timeout) restore_defaults(); } +TEST_F(TestConnectivityWithServer, oc_tcp_cleanup_waiting_session) +{ + auto addr = "coap+tcp://[::1]:12345"; + oc_endpoint_t ep1 = + oc::endpoint::FromString(addr); // reachable address, but inactive port + + oc_tcp_connect_result_t ret1 = + oc_tcp_connect_to_endpoint(&ep1, nullptr, nullptr); + ASSERT_EQ(0, ret1.error); + ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret1.state); + ASSERT_NE(0, ret1.session_id); + + // disconnect is asynchronous, we should be able to open a new session to + // the same endpoint + oc_close_session(&ep1); + + oc_endpoint_t ep2 = oc::endpoint::FromString(addr); + oc_tcp_connect_result_t ret2 = + oc_tcp_connect_to_endpoint(&ep2, nullptr, nullptr); + ASSERT_EQ(0, ret2.error); + ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret2.state); + ASSERT_NE(0, ret2.session_id); + + oc::TestDevice::PoolEventsMsV1(20ms); + + EXPECT_EQ(-1, oc_tcp_session_state(ret1.session_id)); + EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, + oc_tcp_session_state(ret2.session_id)); +} + #endif /* __linux__ */ TEST_F(TestConnectivityWithServer, oc_tcp_connect_repeat_fail) diff --git a/port/windows/tcpadapter.c b/port/windows/tcpadapter.c index 0ae0ae622..ed53f61a2 100644 --- a/port/windows/tcpadapter.c +++ b/port/windows/tcpadapter.c @@ -22,9 +22,9 @@ #include "api/oc_session_events_internal.h" #include "api/oc_tcp_internal.h" #include "port/oc_assert.h" +#include "port/oc_connectivity_internal.h" #include "port/oc_fcntl_internal.h" #include "port/oc_log_internal.h" -#include "util/oc_memb.h" #include "port/oc_tcp_socket_internal.h" #include "ipcontext.h" #include "messaging/coap/coap_internal.h" @@ -33,6 +33,7 @@ #include "oc_endpoint.h" #include "oc_session_events.h" #include "tcpadapter.h" +#include "util/oc_memb.h" #include #include @@ -237,19 +238,18 @@ add_new_session_locked(SOCKET sock, ip_context_t *dev, oc_endpoint_t *endpoint, return SOCKET_ERROR; } + session->dev = dev; endpoint->interface_index = (unsigned)if_index; + if (session_id == 0) { + session_id = oc_tcp_get_new_session_id(); + } + endpoint->session_id = session_id; memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t)); - session->dev = dev; session->endpoint.next = NULL; session->sock = sock; session->csm_state = state; session->sock_event = sock_event; session->notify_session_end = true; - if (session_id == 0) { - session->endpoint.session_id = oc_tcp_get_new_session_id(); - } else { - session->endpoint.session_id = session_id; - } oc_list_add(session_list, session); @@ -330,6 +330,22 @@ find_session_by_endpoint_locked(const oc_endpoint_t *endpoint) return session; } +static tcp_session_t * +find_session_by_id_locked(uint32_t session_id) +{ + tcp_session_t *session = oc_list_head(session_list); + while (session != NULL && session->endpoint.session_id != session_id) { + session = session->next; + } + + if (!session) { + OC_DBG("could not find ongoing TCP session for session id %d", session_id); + return NULL; + } + OC_DBG("found TCP session for session id %d", session_id); + return session; +} + bool oc_tcp_end_session(const oc_endpoint_t *endpoint, bool notify_session_end, oc_endpoint_t *session_endpoint) @@ -1016,6 +1032,18 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint) return -1; } +int +oc_tcp_session_state(uint32_t session_id) +{ + oc_tcp_adapter_mutex_lock(); + tcp_session_t *session = find_session_by_id_locked(session_id); + oc_tcp_adapter_mutex_unlock(); + if (session != NULL) { + return OC_TCP_SOCKET_STATE_CONNECTED; + } + return -1; +} + tcp_csm_state_t oc_tcp_get_csm_state(const oc_endpoint_t *endpoint) {