Skip to content

Commit

Permalink
fixup! tcp: pair connection with the tls peer
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Jun 3, 2024
1 parent 629c3d9 commit ff1c658
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 29 deletions.
38 changes: 31 additions & 7 deletions port/android/tcpadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "oc_endpoint.h"
#include "oc_session_events.h"
#include "port/oc_assert.h"
#include "port/oc_connectivity_internal.h"
#include "port/oc_log_internal.h"
#include "port/oc_tcp_socket_internal.h"
#include "tcpadapter.h"
Expand Down Expand Up @@ -228,18 +229,16 @@ add_new_session(int sock, ip_context_t *dev, oc_endpoint_t *endpoint,
return -1;
}

endpoint->interface_index = (unsigned)if_index;

session->dev = dev;
endpoint->interface_index = (unsigned)if_index;
if (session_id == 0) {
session_id = oc_tcp_get_new_session_id();
}
endpoint->session_id = session_id;
memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t));
session->endpoint.next = NULL;
session->sock = sock;
session->csm_state = state;
if (session_id == 0) {
session->endpoint.session_id = oc_tcp_get_new_session_id();
} else {
session->endpoint.session_id = session_id;
}

oc_list_add(session_list, session);

Expand Down Expand Up @@ -315,6 +314,22 @@ find_session_by_endpoint(const oc_endpoint_t *endpoint)
return session;
}

static tcp_session_t *
find_session_by_id(uint32_t session_id)
{
tcp_session_t *session = oc_list_head(session_list);
while (session != NULL && session->endpoint.session_id != session_id) {
session = session->next;
}

if (!session) {
OC_DBG("could not find ongoing TCP session for session id %d", session_id);
return NULL;
}
OC_DBG("found TCP session for session id %d", session_id);
return session;
}

static tcp_session_t *
get_ready_to_read_session(fd_set *setfds)
{
Expand Down Expand Up @@ -796,6 +811,15 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint)
return -1;
}

int
oc_tcp_session_state(uint32_t session_id)
{
if (find_session_by_id(session_id) != NULL) {
return OC_TCP_SOCKET_STATE_CONNECTED;
}
return -1;
}

tcp_csm_state_t
oc_tcp_get_csm_state(const oc_endpoint_t *endpoint)
{
Expand Down
38 changes: 31 additions & 7 deletions port/esp32/adapter/src/tcpadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "oc_endpoint.h"
#include "oc_session_events.h"
#include "port/oc_assert.h"
#include "port/oc_connectivity_internal.h"
#include "port/oc_log_internal.h"
#include "port/oc_tcp_socket_internal.h"
#include "tcpadapter.h"
Expand Down Expand Up @@ -198,18 +199,16 @@ add_new_session(int sock, ip_context_t *dev, oc_endpoint_t *endpoint,
return -1;
}

endpoint->interface_index = (unsigned)if_index;

session->dev = dev;
endpoint->interface_index = (unsigned)if_index;
if (session_id == 0) {
session_id = oc_tcp_get_new_session_id();
}
endpoint->session_id = session_id;
memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t));
session->endpoint.next = NULL;
session->sock = sock;
session->csm_state = state;
if (session_id == 0) {
session->endpoint.session_id = oc_tcp_get_new_session_id();
} else {
session->endpoint.session_id = session_id;
}

oc_list_add(session_list, session);

Expand Down Expand Up @@ -286,6 +285,22 @@ find_session_by_endpoint(const oc_endpoint_t *endpoint)
return session;
}

static tcp_session_t *
find_session_by_id(uint32_t session_id)
{
tcp_session_t *session = oc_list_head(session_list);
while (session != NULL && session->endpoint.session_id != session_id) {
session = session->next;
}

if (!session) {
OC_DBG("could not find ongoing TCP session for session id %d", session_id);
return NULL;
}
OC_DBG("found TCP session for session id %d", session_id);
return session;
}

static tcp_session_t *
get_ready_to_read_session(fd_set *setfds)
{
Expand Down Expand Up @@ -756,6 +771,15 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint)
return -1;
}

int
oc_tcp_session_state(uint32_t session_id)
{
if (find_session_by_id(session_id) != NULL) {
return OC_TCP_SOCKET_STATE_CONNECTED;
}
return -1;
}

tcp_csm_state_t
oc_tcp_get_csm_state(const oc_endpoint_t *endpoint)
{
Expand Down
60 changes: 53 additions & 7 deletions port/linux/tcpsession.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ get_interface_index(int sock)
static void
log_new_session(oc_endpoint_t *endpoint, int sock, bool is_connected)
{
// GCOVR_EXCL_START
oc_string64_t ep;
const char *addr = "";
if (oc_endpoint_to_string64(endpoint, &ep)) {
Expand All @@ -214,11 +215,13 @@ log_new_session(oc_endpoint_t *endpoint, int sock, bool is_connected)
"connected: %d, session_id: %u",
addr, endpoint->interface_index, sock, (int)is_connected,
(unsigned)endpoint->session_id);
// GCOVR_EXCL_STOP
}

static void
log_free_session(oc_endpoint_t *endpoint, int sock)
{
// GCOVR_EXCL_START
oc_string64_t ep;
const char *addr = "";
if (oc_endpoint_to_string64(endpoint, &ep)) {
Expand All @@ -227,6 +230,7 @@ log_free_session(oc_endpoint_t *endpoint, int sock)
OC_DBG("free TCP session endpoint: %s, endpoint interface: %d, sock: %d, "
"session_id: %u",
addr, endpoint->interface_index, sock, (unsigned)endpoint->session_id);
// GCOVR_EXCL_STOP
}

#endif /* OC_DBG_IS_ENABLED */
Expand All @@ -250,16 +254,16 @@ add_new_session_locked(int sock, ip_context_t *dev, oc_endpoint_t *endpoint,

session->dev = dev;
endpoint->interface_index = (unsigned)if_index;
if (session_id == 0) {
session_id = oc_tcp_get_new_session_id();
}
endpoint->session_id = session_id;
memcpy(&session->endpoint, endpoint, sizeof(oc_endpoint_t));
session->endpoint.next = NULL;
session->endpoint.interface_index = (unsigned)if_index;
session->sock = sock;
session->csm_state = state;
session->notify_session_end = true;
if (session_id == 0) {
session->endpoint.session_id = oc_tcp_get_new_session_id();
} else {
session->endpoint.session_id = session_id;
}

oc_list_add(g_session_list, session);

Expand Down Expand Up @@ -304,8 +308,8 @@ accept_new_session_locked(ip_context_t *dev, int fd, fd_set *setfds,
#endif /* !OC_IPV4 */
}

if (add_new_session_locked(new_socket, dev, endpoint, /*session_id*/ 0,
CSM_NONE) == NULL) {
if (add_new_session_locked(new_socket, dev, endpoint,
/*session_id*/ 0, CSM_NONE) == NULL) {
OC_ERR("could not record new TCP session");
close(new_socket);
return -1;
Expand Down Expand Up @@ -551,6 +555,16 @@ find_session_by_endpoint_locked(const oc_endpoint_t *endpoint)
return session;
}

static tcp_session_t *
find_session_by_id_locked(uint32_t session_id)
{
tcp_session_t *session = oc_list_head(g_session_list);
while (session != NULL && session->endpoint.session_id != session_id) {
session = session->next;
}
return session;
}

#ifdef OC_HAS_FEATURE_TCP_ASYNC_CONNECT

static tcp_session_t *
Expand Down Expand Up @@ -586,6 +600,17 @@ find_waiting_session_by_endpoint_locked(const oc_endpoint_t *endpoint)
return ws;
}

static tcp_waiting_session_t *
find_waiting_session_by_id_locked(uint32_t session_id)
{
tcp_waiting_session_t *ws =
(tcp_waiting_session_t *)oc_list_head(g_waiting_session_list);
while (ws != NULL && ws->endpoint.session_id != session_id) {
ws = ws->next;
}
return ws;
}

static tcp_waiting_session_t *
add_new_waiting_session_locked(int sock, ip_context_t *dev,
const oc_endpoint_t *endpoint,
Expand Down Expand Up @@ -879,6 +904,24 @@ oc_tcp_connection_state(const oc_endpoint_t *endpoint)
return -1;
}

int
oc_tcp_session_state(uint32_t session_id)
{
pthread_mutex_lock(&g_mutex);
if (find_session_by_id_locked(session_id) != NULL) {
pthread_mutex_unlock(&g_mutex);
return OC_TCP_SOCKET_STATE_CONNECTED;
}
#ifdef OC_HAS_FEATURE_TCP_ASYNC_CONNECT
if (find_waiting_session_by_id_locked(session_id) != NULL) {
pthread_mutex_unlock(&g_mutex);
return OC_TCP_SOCKET_STATE_CONNECTING;
}
#endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */
pthread_mutex_unlock(&g_mutex);
return -1;
}

static int
tcp_send_message(int sockfd, const oc_message_t *message)
{
Expand Down Expand Up @@ -1370,6 +1413,9 @@ oc_tcp_connect(oc_endpoint_t *endpoint, on_tcp_connect_t on_tcp_connect,
{
oc_tcp_connect_result_t ret =
oc_tcp_connect_to_endpoint(endpoint, on_tcp_connect, on_tcp_connect_data);
if (ret.session_id != 0) {
endpoint->session_id = ret.session_id;
}
return ret.error != 0 ? ret.error : (int)ret.state;
}

Expand Down
11 changes: 11 additions & 0 deletions port/oc_connectivity_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ void oc_tcp_set_connect_retry(uint8_t max_count, uint16_t timeout);

#endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */

/**
* @brief Get state of TCP connection for given session id.
*
* @param session_id session id
* @return OC_TCP_SOCKET_STATE_CONNECTED TCP connection exists and it is ongoing
* @return OC_TCP_SOCKET_STATE_CONNECTING TCP connection is waiting to be
* established
* @return -1 otherwise
*/
int oc_tcp_session_state(uint32_t session_id);

#endif /* OC_TCP */

#ifdef __cplusplus
Expand Down
37 changes: 36 additions & 1 deletion port/unittest/connectivitytest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include <optional>
#include <string>

using namespace std::chrono_literals;

static constexpr size_t kDeviceID = 0;

class TestConnectivity : public testing::Test {
Expand Down Expand Up @@ -462,11 +464,14 @@ TEST_F(TestConnectivityWithServer, oc_tcp_update_csm_state_P)
coap_serialize_message(&packet, msg->data, oc_message_buffer_size(msg));

oc_send_buffer(msg);
ep.session_id = msg->endpoint.session_id;
oc_message_unref(msg);
#endif /* OC_HAS_FEATURE_TCP_ASYNC_CONNECT */

#ifdef OC_TCP
EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTED, oc_tcp_connection_state(&ep));
ASSERT_NE(0, ep.session_id);
EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTED, oc_tcp_session_state(ep.session_id));
#endif /* OC_TCP */

EXPECT_EQ(0, oc_tcp_update_csm_state(&ep, CSM_DONE));
Expand Down Expand Up @@ -526,7 +531,7 @@ TEST_F(TestConnectivityWithServer, oc_tcp_connect_timeout)
return;
}

EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret);
ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret);

oc_message_t *msg = oc_allocate_message();
memcpy(&msg->endpoint, &ep, sizeof(oc_endpoint_t));
Expand All @@ -544,6 +549,36 @@ TEST_F(TestConnectivityWithServer, oc_tcp_connect_timeout)
restore_defaults();
}

TEST_F(TestConnectivityWithServer, oc_tcp_cleanup_waiting_session)
{
auto addr = "coap+tcp://[::1]:12345";
oc_endpoint_t ep1 =
oc::endpoint::FromString(addr); // reachable address, but inactive port

oc_tcp_connect_result_t ret1 =
oc_tcp_connect_to_endpoint(&ep1, nullptr, nullptr);
ASSERT_EQ(0, ret1.error);
ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret1.state);
ASSERT_NE(0, ret1.session_id);

// disconnect is asynchronous, we should be able to open a new session to
// the same endpoint
oc_close_session(&ep1);

oc_endpoint_t ep2 = oc::endpoint::FromString(addr);
oc_tcp_connect_result_t ret2 =
oc_tcp_connect_to_endpoint(&ep2, nullptr, nullptr);
ASSERT_EQ(0, ret2.error);
ASSERT_EQ(OC_TCP_SOCKET_STATE_CONNECTING, ret2.state);
ASSERT_NE(0, ret2.session_id);

oc::TestDevice::PoolEventsMsV1(20ms);

EXPECT_EQ(-1, oc_tcp_session_state(ret1.session_id));
EXPECT_EQ(OC_TCP_SOCKET_STATE_CONNECTING,
oc_tcp_session_state(ret2.session_id));
}

#endif /* __linux__ */

TEST_F(TestConnectivityWithServer, oc_tcp_connect_repeat_fail)
Expand Down
Loading

0 comments on commit ff1c658

Please sign in to comment.