Skip to content

Commit

Permalink
Merge pull request #422 from lf-lang/fed-socket-fixes
Browse files Browse the repository at this point in the history
RTI and federate socket fixes
  • Loading branch information
lhstrh authored May 13, 2024
2 parents 1bd5130 + 3fc51c4 commit 2318dad
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
19 changes: 15 additions & 4 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,10 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
// issue a TAG before this message has been forwarded.
LF_MUTEX_LOCK(&rti_mutex);

// If the destination federate is no longer connected, issue a warning
// and return.
// If the destination federate is no longer connected, issue a warning,
// remove the message from the socket and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
if (fed->enclave.state == NOT_CONNECTED) {
LF_MUTEX_UNLOCK(&rti_mutex);
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id);
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
Expand All @@ -401,6 +400,18 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep,
fed->enclave.last_provisionally_granted.time - start_time,
fed->enclave.last_provisionally_granted.microstep);
// If the message was larger than the buffer, we must empty out the remainder also.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
bytes_to_read = total_bytes_to_read - total_bytes_read;
if (bytes_to_read > FED_COM_BUFFER_SIZE) {
bytes_to_read = FED_COM_BUFFER_SIZE;
}
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL,
"RTI failed to clear message chunks.");
total_bytes_read += bytes_to_read;
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}

Expand Down Expand Up @@ -1073,7 +1084,7 @@ void* federate_info_thread_TCP(void* fed) {
int read_failed = read_from_socket(my_fed->socket, 1, buffer);
if (read_failed) {
// Socket is closed
lf_print_warning("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
my_fed->enclave.state = NOT_CONNECTED;
my_fed->socket = -1;
// FIXME: We need better error handling here, but do not stop execution here.
Expand Down
9 changes: 5 additions & 4 deletions core/federated/network/net_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) {
return -1;
}
ssize_t bytes_read = 0;
int retry_count = 0;
while (bytes_read < (ssize_t)num_bytes) {
ssize_t more = read(socket, buffer + bytes_read, num_bytes - (size_t)bytes_read);
if (more < 0 && retry_count++ < NUM_SOCKET_RETRIES && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
if (more < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
// Those error codes set by the socket indicates
// that we should try again (@see man errno).
lf_print_warning("Reading from socket failed. Will try again.");
LF_PRINT_DEBUG("Reading from socket %d failed with error: `%s`. Will try again.", socket, strerror(errno));
lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES);
continue;
} else if (more < 0) {
// A more serious error occurred.
lf_print_error("Reading from socket %d failed. With error: `%s`", socket, strerror(errno));
return -1;
} else if (more == 0) {
// EOF received.
Expand Down Expand Up @@ -173,11 +173,12 @@ int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer) {
// The error codes EAGAIN or EWOULDBLOCK indicate
// that we should try again (@see man errno).
// The error code EINTR means the system call was interrupted before completing.
LF_PRINT_DEBUG("Writing to socket was blocked. Will try again.");
LF_PRINT_DEBUG("Writing to socket %d was blocked. Will try again.", socket);
lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES);
continue;
} else if (more < 0) {
// A more serious error occurred.
lf_print_error("Writing to socket %d failed. With error: `%s`", socket, strerror(errno));
return -1;
}
bytes_written += more;
Expand Down

0 comments on commit 2318dad

Please sign in to comment.