From 2c9e48fe0d4a925c28751deda16e18e90390d3cf Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Tue, 14 May 2024 16:07:26 +0200 Subject: [PATCH 01/36] Very incomplete start --- core/threaded/scheduler_GEDF_NP.c | 53 +++++++++------------- include/core/threaded/reactor_threaded.h | 1 + include/core/threaded/scheduler_instance.h | 2 +- 3 files changed, 23 insertions(+), 33 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index d590adecb..d45a31507 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -31,18 +31,15 @@ /////////////////// Scheduler Private API ///////////////////////// /** - * @brief Insert 'reaction' into scheduler->triggered_reactions - * at the appropriate level. - * + * @brief Insert 'reaction' into scheduler->triggered_reactions, the reaction queue. * @param reaction The reaction to insert. */ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction_t* reaction) { - size_t reaction_level = LF_LEVEL(reaction->index); - LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level); - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]); - LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level); - pqueue_insert(((pqueue_t**)scheduler->triggered_reactions)[reaction_level], (void*)reaction); - LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]); + LF_PRINT_DEBUG("Scheduler: Locking mutex for reaction queue."); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[0]); + LF_PRINT_DEBUG("Scheduler: Locked mutex for reaction queue."); + pqueue_insert(((pqueue_t**)scheduler->triggered_reactions)[0], (void*)reaction); + LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[0]); } /** @@ -54,8 +51,8 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction */ int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { pqueue_t* tmp_queue = NULL; - // Note: All the threads are idle, which means that they are done inserting - // reactions. Therefore, the reaction queues can be accessed without locking + // Note: All the worker threads are idle, which means that they are done inserting + // reactions. Therefore, the reaction queue can be accessed without locking // a mutex. while (scheduler->next_reaction_level <= scheduler->max_reaction_level) { @@ -195,24 +192,16 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* } lf_scheduler_t* scheduler = env->scheduler; - scheduler->triggered_reactions = calloc((scheduler->max_reaction_level + 1), sizeof(pqueue_t*)); - - scheduler->array_of_mutexes = (lf_mutex_t*)calloc((scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); + // Just one reaction queue and mutex for each environment. + scheduler->triggered_reactions = calloc(1, sizeof(pqueue_t*)); + scheduler->array_of_mutexes = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t)); - size_t queue_size = INITIAL_REACT_QUEUE_SIZE; - for (size_t i = 0; i <= scheduler->max_reaction_level; i++) { - if (params != NULL) { - if (params->num_reactions_per_level != NULL) { - queue_size = params->num_reactions_per_level[i]; - } - } - // Initialize the reaction queues - ((pqueue_t**)scheduler->triggered_reactions)[i] = - pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position, - reaction_matches, print_reaction); - // Initialize the mutexes for the reaction queues - LF_MUTEX_INIT(&scheduler->array_of_mutexes[i]); - } + // Initialize the reaction queue. + ((pqueue_t**)scheduler->triggered_reactions)[0] = + pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position, + reaction_matches, print_reaction); + // Initialize the mutexes for the reaction queues + LF_MUTEX_INIT(&scheduler->array_of_mutexes[0]); scheduler->executing_reactions = ((pqueue_t**)scheduler->triggered_reactions)[0]; } @@ -248,11 +237,11 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu while (!scheduler->should_stop) { // Need to lock the mutex for the current level size_t current_level = scheduler->next_reaction_level - 1; - LF_PRINT_DEBUG("Scheduler: Worker %d trying to lock the mutex for level %zu.", worker_number, current_level); - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]); - LF_PRINT_DEBUG("Scheduler: Worker %d locked the mutex for level %zu.", worker_number, current_level); + LF_PRINT_DEBUG("Scheduler: Worker %d locking reaction queue mutex.", worker_number); + LF_MUTEX_LOCK(&scheduler->array_of_mutexes[0]); + LF_PRINT_DEBUG("Scheduler: Worker %d locked reaction queue mutex.", worker_number); reaction_t* reaction_to_return = (reaction_t*)pqueue_pop((pqueue_t*)scheduler->executing_reactions); - LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[current_level]); + LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[0]); if (reaction_to_return != NULL) { // Got a reaction diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 96de7ac49..4fa9b07a0 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -14,6 +14,7 @@ /** * @brief Advance to the next level. + * * For federated runtimes, this function should * stall the advance until we know that we can safely execute the next level * given knowledge about upstream network port statuses. diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index f664066e6..fa6255af3 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -90,7 +90,7 @@ typedef struct lf_scheduler_t { void* triggered_reactions; /** - * @brief An array of mutexes. + * @brief Mutex used to protect the reaction queue. * * Can be used to avoid race conditions. Schedulers are allowed to * initialize as many mutexes as they deem fit. From 6111c76e0b8469e4445eec5a8267c676d1745216 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 20 May 2024 18:29:12 +0200 Subject: [PATCH 02/36] Steps towards revised GEDF --- core/threaded/scheduler_GEDF_NP.c | 59 +++++++++++++------------------ core/threaded/scheduler_NP.c | 6 ++-- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index d45a31507..04e84e6d5 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -42,41 +42,12 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[0]); } -/** - * @brief Distribute any reaction that is ready to execute to idle worker - * thread(s). - * - * @return Number of reactions that were successfully distributed to worker - * threads. - */ -int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { - pqueue_t* tmp_queue = NULL; - // Note: All the worker threads are idle, which means that they are done inserting - // reactions. Therefore, the reaction queue can be accessed without locking - // a mutex. - - while (scheduler->next_reaction_level <= scheduler->max_reaction_level) { - LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level); - try_advance_level(scheduler->env, &scheduler->next_reaction_level); - - tmp_queue = ((pqueue_t**)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1]; - size_t reactions_to_execute = pqueue_size(tmp_queue); - - if (reactions_to_execute) { - scheduler->executing_reactions = tmp_queue; - return reactions_to_execute; - } - } - - return 0; -} - /** * @brief If there is work to be done, notify workers individually. * * This assumes that the caller is not holding any thread mutexes. */ -void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { +static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { // Note: All threads are idle. Therefore, there is no need to lock the mutex // while accessing the executing queue (which is pointing to one of the // reaction queues). @@ -132,7 +103,27 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { LF_MUTEX_UNLOCK(&env->mutex); } - if (_lf_sched_distribute_ready_reactions(scheduler) > 0) { + int num_reactions = 0; + pqueue_t* tmp_queue = NULL; + // Note: All the worker threads are idle, which means that they are done inserting + // reactions. Therefore, the reaction queue can be accessed without locking + // a mutex. + + while (scheduler->next_reaction_level <= scheduler->max_reaction_level) { + LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level); + try_advance_level(scheduler->env, &scheduler->next_reaction_level); + + tmp_queue = ((pqueue_t**)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1]; + size_t reactions_to_execute = pqueue_size(tmp_queue); + + if (reactions_to_execute) { + scheduler->executing_reactions = tmp_queue; + num_reactions = reactions_to_execute; + break; + } + } + + if (num_reactions > 0) { _lf_sched_notify_workers(scheduler); break; } @@ -149,7 +140,7 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { * @param worker_number The worker number of the worker thread asking for work * to be assigned to it. */ -void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { +static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { // Increment the number of idle workers by 1 and check if this is the last // worker thread to become idle. if (((size_t)lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1)) == @@ -161,8 +152,7 @@ void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { } else { // Not the last thread to become idle. // Wait for work to be released. - LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling " - "semaphore.", + LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling semaphore.", worker_number); lf_semaphore_acquire(scheduler->semaphore); LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number); @@ -197,6 +187,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* scheduler->array_of_mutexes = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t)); // Initialize the reaction queue. + size_t queue_size = INITIAL_REACT_QUEUE_SIZE; ((pqueue_t**)scheduler->triggered_reactions)[0] = pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position, reaction_matches, print_reaction); diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 01b510477..678eb4daf 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -80,7 +80,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction * * @return 1 if any reaction is ready. 0 otherwise. */ -int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { +static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { // Note: All the threads are idle, which means that they are done inserting // reactions. Therefore, the reaction vectors can be accessed without // locking a mutex. @@ -107,7 +107,7 @@ int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { * * This assumes that the caller is not holding any thread mutexes. */ -void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { +static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { // Calculate the number of workers that we need to wake up, which is the // number of reactions enabled at this level. // Note: All threads are idle. Therefore, there is no need to lock the mutex while accessing the index for the @@ -182,7 +182,7 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { * @param worker_number The worker number of the worker thread asking for work * to be assigned to it. */ -void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { +static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { // Increment the number of idle workers by 1 and check if this is the last // worker thread to become idle. if (lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) { From 9ca8ddd9804120a5fd04810a42bf0621b50f6625 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Tue, 21 May 2024 17:57:30 +0200 Subject: [PATCH 03/36] First pass on simpler GEDF --- core/threaded/reactor_threaded.c | 2 +- core/threaded/scheduler_GEDF_NP.c | 263 ++++++++------------ core/threaded/scheduler_NP.c | 4 +- low_level_platform/api/low_level_platform.h | 2 +- 4 files changed, 109 insertions(+), 162 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 57f888fc2..b43d5e851 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -862,7 +862,7 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) /** * The main looping logic of each LF worker thread. - * This function assumes the caller holds the mutex lock. + * This function assumes the caller does not hold the mutex lock on the environment. * * @param env Environment within which we are executing. * @param worker_number The number assigned to this worker thread diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 04e84e6d5..bf5fc2990 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -25,139 +25,18 @@ #include "scheduler_instance.h" #include "scheduler_sync_tag_advance.h" #include "scheduler.h" -#include "lf_semaphore.h" #include "tracepoint.h" #include "util.h" -/////////////////// Scheduler Private API ///////////////////////// -/** - * @brief Insert 'reaction' into scheduler->triggered_reactions, the reaction queue. - * @param reaction The reaction to insert. - */ -static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction_t* reaction) { - LF_PRINT_DEBUG("Scheduler: Locking mutex for reaction queue."); - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[0]); - LF_PRINT_DEBUG("Scheduler: Locked mutex for reaction queue."); - pqueue_insert(((pqueue_t**)scheduler->triggered_reactions)[0], (void*)reaction); - LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[0]); -} - -/** - * @brief If there is work to be done, notify workers individually. - * - * This assumes that the caller is not holding any thread mutexes. - */ -static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { - // Note: All threads are idle. Therefore, there is no need to lock the mutex - // while accessing the executing queue (which is pointing to one of the - // reaction queues). - size_t workers_to_awaken = - LF_MIN(scheduler->number_of_idle_workers, pqueue_size((pqueue_t*)scheduler->executing_reactions)); - LF_PRINT_DEBUG("Scheduler: Notifying %zu workers.", workers_to_awaken); - scheduler->number_of_idle_workers -= workers_to_awaken; - LF_PRINT_DEBUG("Scheduler: New number of idle workers: %zu.", scheduler->number_of_idle_workers); - if (workers_to_awaken > 1) { - // Notify all the workers except the worker thread that has called this - // function. - lf_semaphore_release(scheduler->semaphore, (workers_to_awaken - 1)); - } -} - -/** - * @brief Signal all worker threads that it is time to stop. - * - */ -void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { - scheduler->should_stop = true; - lf_semaphore_release(scheduler->semaphore, (scheduler->number_of_workers - 1)); -} - -/** - * @brief Advance tag or distribute reactions to worker threads. - * - * Advance tag if there are no reactions on the reaction queue. If - * there are such reactions, distribute them to worker threads. - * - * This function assumes the caller does not hold the 'mutex' lock. - */ -void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { - environment_t* env = scheduler->env; - - // Executing queue must be empty when this is called. - assert(pqueue_size((pqueue_t*)scheduler->executing_reactions) == 0); - - // Loop until it's time to stop or work has been distributed - while (true) { - if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) { - scheduler->next_reaction_level = 0; - LF_MUTEX_LOCK(&env->mutex); - // Nothing more happening at this tag. - LF_PRINT_DEBUG("Scheduler: Advancing tag."); - // This worker thread will take charge of advancing tag. - if (_lf_sched_advance_tag_locked(scheduler)) { - LF_PRINT_DEBUG("Scheduler: Reached stop tag."); - _lf_sched_signal_stop(scheduler); - LF_MUTEX_UNLOCK(&env->mutex); - break; - } - LF_MUTEX_UNLOCK(&env->mutex); - } - - int num_reactions = 0; - pqueue_t* tmp_queue = NULL; - // Note: All the worker threads are idle, which means that they are done inserting - // reactions. Therefore, the reaction queue can be accessed without locking - // a mutex. - - while (scheduler->next_reaction_level <= scheduler->max_reaction_level) { - LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level); - try_advance_level(scheduler->env, &scheduler->next_reaction_level); - - tmp_queue = ((pqueue_t**)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1]; - size_t reactions_to_execute = pqueue_size(tmp_queue); +// Data specific to the GEDF scheduler. +typedef struct custom_scheduler_data_t { + pqueue_t* reaction_q; + lf_cond_t reaction_q_changed; + size_t current_level; +} custom_scheduler_data_t; - if (reactions_to_execute) { - scheduler->executing_reactions = tmp_queue; - num_reactions = reactions_to_execute; - break; - } - } - - if (num_reactions > 0) { - _lf_sched_notify_workers(scheduler); - break; - } - } -} +/////////////////// Scheduler Private API ///////////////////////// -/** - * @brief Wait until the scheduler assigns work. - * - * If the calling worker thread is the last to become idle, it will call on the - * scheduler to distribute work. Otherwise, it will wait on - * 'scheduler->semaphore'. - * - * @param worker_number The worker number of the worker thread asking for work - * to be assigned to it. - */ -static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { - // Increment the number of idle workers by 1 and check if this is the last - // worker thread to become idle. - if (((size_t)lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1)) == - scheduler->number_of_workers) { - // Last thread to go idle - LF_PRINT_DEBUG("Scheduler: Worker %zu is the last idle thread.", worker_number); - // Call on the scheduler to distribute work or advance tag. - _lf_scheduler_try_advance_tag_and_distribute(scheduler); - } else { - // Not the last thread to become idle. - // Wait for work to be released. - LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling semaphore.", - worker_number); - lf_semaphore_acquire(scheduler->semaphore); - LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number); - } -} ///////////////////// Scheduler Init and Destroy API ///////////////////////// /** @@ -186,15 +65,17 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* scheduler->triggered_reactions = calloc(1, sizeof(pqueue_t*)); scheduler->array_of_mutexes = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t)); + scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); + // Initialize the reaction queue. size_t queue_size = INITIAL_REACT_QUEUE_SIZE; - ((pqueue_t**)scheduler->triggered_reactions)[0] = + scheduler->custom_data->reaction_q = pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position, reaction_matches, print_reaction); - // Initialize the mutexes for the reaction queues - LF_MUTEX_INIT(&scheduler->array_of_mutexes[0]); - scheduler->executing_reactions = ((pqueue_t**)scheduler->triggered_reactions)[0]; + LF_COND_INIT(&scheduler->custom_data->reaction_q_changed, &env->mutex); + + scheduler->custom_data->current_level = 0; } /** @@ -207,8 +88,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) { // pqueue_free(scheduler->triggered_reactions[j]); // FIXME: This is causing weird memory errors. // } - pqueue_free((pqueue_t*)scheduler->executing_reactions); - lf_semaphore_destroy(scheduler->semaphore); + pqueue_free((pqueue_t*)scheduler->custom_data->reaction_q); + free(scheduler->custom_data); } ///////////////////// Scheduler Worker API (public) ///////////////////////// @@ -218,36 +99,92 @@ void lf_sched_free(lf_scheduler_t* scheduler) { * This function blocks until it can return a ready reaction for worker thread * 'worker_number' or it is time for the worker thread to stop and exit (where a * NULL value would be returned). - * - * @param worker_number - * @return reaction_t* A reaction for the worker to execute. NULL if the calling - * worker thread should exit. + * + * This function assumes that the environment mutex is not locked. + * @param scheduler The scheduler instance. + * @param worker_number The worker number. + * @return A reaction for the worker to execute. NULL if the calling worker thread should exit. */ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { - // Iterate until the stop_tag is reached or reaction queue is empty - while (!scheduler->should_stop) { - // Need to lock the mutex for the current level - size_t current_level = scheduler->next_reaction_level - 1; - LF_PRINT_DEBUG("Scheduler: Worker %d locking reaction queue mutex.", worker_number); - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[0]); - LF_PRINT_DEBUG("Scheduler: Worker %d locked reaction queue mutex.", worker_number); - reaction_t* reaction_to_return = (reaction_t*)pqueue_pop((pqueue_t*)scheduler->executing_reactions); - LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[0]); + + // Need to lock the environment mutex. + LF_PRINT_DEBUG("Scheduler: Worker %d locking environment mutex.", worker_number); + LF_MUTEX_LOCK(&scheduler->env->mutex); + LF_PRINT_DEBUG("Scheduler: Worker %d locked environment mutex.", worker_number); + // Iterate until the stop_tag is reached or the event queue is empty. + while (!scheduler->should_stop) { + reaction_t* reaction_to_return = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); if (reaction_to_return != NULL) { - // Got a reaction - return reaction_to_return; + // Found a reaction. Check the level. Notice that because of deadlines, the current level + // may advance to the maximum and then back down to 0. + if (LF_LEVEL(reaction_to_return->index) == scheduler->custom_data->current_level) { + // Found a reaction at the current level. + LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %zu.", + worker_number, scheduler->custom_data->current_level); + // Remove the reaction from the queue. + pqueue_pop(scheduler->custom_data->reaction_q); + LF_MUTEX_UNLOCK(&scheduler->env->mutex); + return reaction_to_return; + } else { + // Found a reaction at a level other than the current level. + LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %lld. Current level is %zu", + worker_number, LF_LEVEL(reaction_to_return->index), scheduler->custom_data->current_level); + // We need to wait to advance to the next level or get a new reaction at the current level. + if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { + // All other workers are idle. Advance to the next level. + try_advance_level(scheduler->env, &scheduler->custom_data->current_level); + if (scheduler->custom_data->current_level > scheduler->max_reaction_level) { + // Since the reaction queue is not empty, we must be cycling back to level 0 due to deadlines + // having been given precedence over levels. Reset the next level to 1. + scheduler->custom_data->current_level = 0; + } + LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", + scheduler->custom_data->current_level); + // Notify other workers that we are at the next level. + LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + } else { + // Some workers are still working on reactions on the current level. + // Wait for them to finish. + scheduler->number_of_idle_workers++; + tracepoint_worker_wait_starts(scheduler->env, worker_number); + LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); + tracepoint_worker_wait_ends(scheduler->env, worker_number); + scheduler->number_of_idle_workers--; + } + } + } else { + // The reaction queue is empty. + LF_PRINT_DEBUG("Worker %d finds nothing on the reaction queue.", worker_number); + + // If all other workers are idle, then we are done with this tag. + if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { + // Last thread to go idle + LF_PRINT_DEBUG("Scheduler: Worker %d is advancing the tag.", worker_number); + // Advance the tag. + scheduler->custom_data->current_level = 0; + if (_lf_sched_advance_tag_locked(scheduler)) { + LF_PRINT_DEBUG("Scheduler: Reached stop tag."); + scheduler->should_stop = true; + LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + break; + } + try_advance_level(scheduler->env, &scheduler->custom_data->current_level); + LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + } else { + // Some other workers are still working on reactions on the current level. + // Wait for them to finish. + scheduler->number_of_idle_workers++; + tracepoint_worker_wait_starts(scheduler->env, worker_number); + LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); + tracepoint_worker_wait_ends(scheduler->env, worker_number); + scheduler->number_of_idle_workers--; + } } - - LF_PRINT_DEBUG("Worker %d is out of ready reactions.", worker_number); - - // Ask the scheduler for more work and wait - tracepoint_worker_wait_starts(scheduler->env, worker_number); - _lf_sched_wait_for_work(scheduler, worker_number); - tracepoint_worker_wait_ends(scheduler->env, worker_number); } // It's time for the worker thread to stop and exit. + LF_MUTEX_UNLOCK(&scheduler->env->mutex); return NULL; } @@ -283,11 +220,21 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction * worker number does not make sense (e.g., the caller is not a worker thread). */ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { - (void)worker_number; + (void)worker_number; // Suppress unused parameter warning. if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) { return; } LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index)); - _lf_sched_insert_reaction(scheduler, reaction); + + // FIXME: Mutex not needed when pulling from the event queue. + LF_PRINT_DEBUG("Scheduler: Locking mutex for environment."); + LF_MUTEX_LOCK(&scheduler->env->mutex); + LF_PRINT_DEBUG("Scheduler: Locked mutex for environment."); + + pqueue_insert(scheduler->custom_data->reaction_q, (void*)reaction); + // Notify any idle workers of the new reaction. + LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + + LF_MUTEX_UNLOCK(&scheduler->env->mutex); } #endif // SCHEDULER == SCHED_GEDF_NP diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 678eb4daf..c9388c294 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -130,7 +130,7 @@ static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { * @brief Signal all worker threads that it is time to stop. * */ -void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { +static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { scheduler->should_stop = true; lf_semaphore_release(scheduler->semaphore, (scheduler->number_of_workers - 1)); } @@ -143,7 +143,7 @@ void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { * * This function assumes the caller does not hold the 'mutex' lock. */ -void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { +static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { // Reset the index environment_t* env = scheduler->env; scheduler->indexes[scheduler->next_reaction_level - 1] = 0; diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 8494a4d85..409f4a214 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -232,7 +232,7 @@ int lf_cond_signal(lf_cond_t* cond); /** * Wait for condition variable "cond" to be signaled or broadcast. - * "mutex" is assumed to be locked before. + * The cond->mutex is assumed to be locked when this is called. * * @return 0 on success, platform-specific error number otherwise. */ From 2df85de53279f8c2b17396be3f1e05778a095930 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 00:30:34 +0200 Subject: [PATCH 04/36] Reduce notifications --- core/threaded/reactor_threaded.c | 6 ++- core/threaded/scheduler_GEDF_NP.c | 87 ++++++++++++++----------------- include/core/threaded/scheduler.h | 1 + 3 files changed, 44 insertions(+), 50 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index b43d5e851..559e4bda9 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -861,7 +861,11 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) } /** - * The main looping logic of each LF worker thread. + * @brief The main looping logic of each LF worker thread. + * + * This function returns when the scheduler's lf_sched_get_ready_reaction() + * implementation returns NULL, indicating that there are no more reactions to execute. + * * This function assumes the caller does not hold the mutex lock on the environment. * * @param env Environment within which we are executing. diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index bf5fc2990..c72ff0997 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -33,6 +33,7 @@ typedef struct custom_scheduler_data_t { pqueue_t* reaction_q; lf_cond_t reaction_q_changed; size_t current_level; + bool solo_holds_mutex; // Indicates sole thread holds the mutex. } custom_scheduler_data_t; /////////////////// Scheduler Private API ///////////////////////// @@ -93,20 +94,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) { } ///////////////////// Scheduler Worker API (public) ///////////////////////// -/** - * @brief Ask the scheduler for one more reaction. - * - * This function blocks until it can return a ready reaction for worker thread - * 'worker_number' or it is time for the worker thread to stop and exit (where a - * NULL value would be returned). - * - * This function assumes that the environment mutex is not locked. - * @param scheduler The scheduler instance. - * @param worker_number The worker number. - * @return A reaction for the worker to execute. NULL if the calling worker thread should exit. - */ + reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { - // Need to lock the environment mutex. LF_PRINT_DEBUG("Scheduler: Worker %d locking environment mutex.", worker_number); LF_MUTEX_LOCK(&scheduler->env->mutex); @@ -124,6 +113,16 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu worker_number, scheduler->custom_data->current_level); // Remove the reaction from the queue. pqueue_pop(scheduler->custom_data->reaction_q); + + // If there is another reaction at the current level and an idle thread, then + // notify an idle thread. + reaction_t* next_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); + if (next_reaction != NULL + && LF_LEVEL(next_reaction->index) == scheduler->custom_data->current_level + && scheduler->number_of_idle_workers > 0) { + // Notify an idle thread. + LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); + } LF_MUTEX_UNLOCK(&scheduler->env->mutex); return reaction_to_return; } else { @@ -141,8 +140,6 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu } LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); - // Notify other workers that we are at the next level. - LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); } else { // Some workers are still working on reactions on the current level. // Wait for them to finish. @@ -163,14 +160,19 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu LF_PRINT_DEBUG("Scheduler: Worker %d is advancing the tag.", worker_number); // Advance the tag. scheduler->custom_data->current_level = 0; + // Set a flag in the scheduler that the lock is held by the sole executing thread. + // This prevents acquiring the mutex in lf_scheduler_trigger_reaction. + scheduler->custom_data->solo_holds_mutex = true; if (_lf_sched_advance_tag_locked(scheduler)) { LF_PRINT_DEBUG("Scheduler: Reached stop tag."); scheduler->should_stop = true; + scheduler->custom_data->solo_holds_mutex = false; + // Notify all threads that the stop tag has been reached. LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); break; } + scheduler->custom_data->solo_holds_mutex = false; try_advance_level(scheduler->env, &scheduler->custom_data->current_level); - LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); } else { // Some other workers are still working on reactions on the current level. // Wait for them to finish. @@ -188,37 +190,13 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu return NULL; } -/** - * @brief Inform the scheduler that worker thread 'worker_number' is done - * executing the 'done_reaction'. - * - * @param worker_number The worker number for the worker thread that has - * finished executing 'done_reaction'. - * @param done_reaction The reaction that is done. - */ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) { - (void)worker_number; + (void)worker_number; // Suppress unused parameter warning. if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } } -/** - * @brief Inform the scheduler that worker thread 'worker_number' would like to - * trigger 'reaction' at the current tag. - * - * If a worker number is not available (e.g., this function is not called by a - * worker thread), -1 should be passed as the 'worker_number'. - * - * The scheduler will ensure that the same reaction is not triggered twice in - * the same tag. - * - * @param reaction The reaction to trigger at the current tag. - * @param worker_number The ID of the worker that is making this call. 0 should - * be used if there is only one worker (e.g., when the program is using the - * single-threaded C runtime). -1 is used for an anonymous call in a context where a - * worker number does not make sense (e.g., the caller is not a worker thread). - */ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { (void)worker_number; // Suppress unused parameter warning. if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) { @@ -226,15 +204,26 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti } LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index)); - // FIXME: Mutex not needed when pulling from the event queue. - LF_PRINT_DEBUG("Scheduler: Locking mutex for environment."); - LF_MUTEX_LOCK(&scheduler->env->mutex); - LF_PRINT_DEBUG("Scheduler: Locked mutex for environment."); - + // Mutex not needed when pulling from the event queue. + if (!scheduler->custom_data->solo_holds_mutex) { + LF_PRINT_DEBUG("Scheduler: Locking mutex for environment."); + LF_MUTEX_LOCK(&scheduler->env->mutex); + LF_PRINT_DEBUG("Scheduler: Locked mutex for environment."); + } pqueue_insert(scheduler->custom_data->reaction_q, (void*)reaction); - // Notify any idle workers of the new reaction. - LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + if (!scheduler->custom_data->solo_holds_mutex) { + // If this is called from a reaction execution, then the triggered reaction + // has one level higher than the current level. No need to notify idle threads. + // But in federated execution, it could be called because of message arrival. + // Also, in modal models, reset and startup reactions may be triggered. +#if defined(FEDERATED) || defined(MODAL) + reaction_t* triggered_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); + if (LF_LEVEL(triggered_reaction->index) == scheduler->custom_data->current_level) { + LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); + } +#endif // FEDERATED || MODAL - LF_MUTEX_UNLOCK(&scheduler->env->mutex); + LF_MUTEX_UNLOCK(&scheduler->env->mutex); + } } #endif // SCHEDULER == SCHED_GEDF_NP diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index ea9f008c2..44997bc74 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -76,6 +76,7 @@ void lf_sched_free(lf_scheduler_t* scheduler); * This function blocks until it can return a ready reaction for worker thread * 'worker_number' or it is time for the worker thread to stop and exit (where a * NULL value would be returned). + * This function assumes that the environment mutex is not locked. * * @param scheduler The scheduler * @param worker_number For the calling worker thread. From 7a001d45fc4f116213ae45ab4dfa2b410fa63395 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 10:03:06 +0200 Subject: [PATCH 05/36] Fix level error and refactor --- core/federated/federate.c | 11 ++-- core/threaded/scheduler_GEDF_NP.c | 43 +++++++++------ core/threaded/scheduler_sync_tag_advance.c | 54 +++---------------- include/core/federated/federate.h | 7 +++ .../threaded/scheduler_sync_tag_advance.h | 53 +++++++++--------- 5 files changed, 75 insertions(+), 93 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 38d69f167..e3e4dbc61 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2611,14 +2611,19 @@ void lf_set_federation_id(const char* fid) { federation_metadata.federation_id = void lf_spawn_staa_thread() { lf_thread_create(&_fed.staaSetter, update_ports_from_staa_offsets, NULL); } #endif // FEDERATED_DECENTRALIZED -void lf_stall_advance_level_federation(environment_t* env, size_t level) { - LF_PRINT_DEBUG("Acquiring the environment mutex."); - LF_MUTEX_LOCK(&env->mutex); +void lf_stall_advance_level_federation_locked(environment_t* env, size_t level) { LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); while (((int)level) >= max_level_allowed_to_advance) { lf_cond_wait(&lf_port_status_changed); }; LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, level); +} + +void lf_stall_advance_level_federation(environment_t* env, size_t level) { + LF_PRINT_DEBUG("Acquiring the environment mutex."); + LF_MUTEX_LOCK(&env->mutex); + LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); + lf_stall_advance_level_federation_locked(env, level); LF_MUTEX_UNLOCK(&env->mutex); } diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index c72ff0997..8520423cd 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -38,6 +38,18 @@ typedef struct custom_scheduler_data_t { /////////////////// Scheduler Private API ///////////////////////// +/** + * @brief Mark the calling thread idle and wait for notification of change to the reaction queue. + * @param scheduler The scheduler. + * @param worker_number The number of the worker thread. + */ +static void inline wait_for_other_workers_to_finish(lf_scheduler_t* scheduler, int worker_number) { + scheduler->number_of_idle_workers++; + tracepoint_worker_wait_starts(scheduler->env, worker_number); + LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); + tracepoint_worker_wait_ends(scheduler->env, worker_number); + scheduler->number_of_idle_workers--; +} ///////////////////// Scheduler Init and Destroy API ///////////////////////// /** @@ -132,22 +144,21 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // We need to wait to advance to the next level or get a new reaction at the current level. if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { // All other workers are idle. Advance to the next level. - try_advance_level(scheduler->env, &scheduler->custom_data->current_level); - if (scheduler->custom_data->current_level > scheduler->max_reaction_level) { + if (++scheduler->custom_data->current_level > scheduler->max_reaction_level) { // Since the reaction queue is not empty, we must be cycling back to level 0 due to deadlines - // having been given precedence over levels. Reset the next level to 1. + // having been given precedence over levels. Reset the current level to 1. scheduler->custom_data->current_level = 0; } LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); - } else { +#ifdef FEDERATED + // In case there are blocking network input reactions at this level, stall. + lf_stall_advance_level_federation_locked(scheduler->env, scheduler->custom_data->current_level); +#endif + } else { // Some workers are still working on reactions on the current level. // Wait for them to finish. - scheduler->number_of_idle_workers++; - tracepoint_worker_wait_starts(scheduler->env, worker_number); - LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); - tracepoint_worker_wait_ends(scheduler->env, worker_number); - scheduler->number_of_idle_workers--; + wait_for_other_workers_to_finish(scheduler, worker_number); } } } else { @@ -159,7 +170,6 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // Last thread to go idle LF_PRINT_DEBUG("Scheduler: Worker %d is advancing the tag.", worker_number); // Advance the tag. - scheduler->custom_data->current_level = 0; // Set a flag in the scheduler that the lock is held by the sole executing thread. // This prevents acquiring the mutex in lf_scheduler_trigger_reaction. scheduler->custom_data->solo_holds_mutex = true; @@ -172,15 +182,16 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu break; } scheduler->custom_data->solo_holds_mutex = false; - try_advance_level(scheduler->env, &scheduler->custom_data->current_level); + // Reset the level to 0. + scheduler->custom_data->current_level = 0; +#ifdef FEDERATED + // In case there are blocking network input reactions at this level, stall. + lf_stall_advance_level_federation_locked(scheduler->env, scheduler->custom_data->current_level); +#endif } else { // Some other workers are still working on reactions on the current level. // Wait for them to finish. - scheduler->number_of_idle_workers++; - tracepoint_worker_wait_starts(scheduler->env, worker_number); - LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); - tracepoint_worker_wait_ends(scheduler->env, worker_number); - scheduler->number_of_idle_workers--; + wait_for_other_workers_to_finish(scheduler, worker_number); } } } diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index 1b0556ba1..5b52495a4 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -1,63 +1,29 @@ -#if !defined(LF_SINGLE_THREADED) -/************* -Copyright (c) 2022, The University of Texas at Dallas. -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** * @file scheduler_sync_tag_advance.c * @author Soroush Bateni (soroush@utdallas.edu) * @author Edward A. Lee * @author Marten Lohstroh * @brief API used to advance tag globally. - * - * @copyright Copyright (c) 2022, The University of Texas at Dallas. - * @copyright Copyright (c) 2022, The University of California at Berkeley. + * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas + * License: BSD 2-clause */ +#if !defined(LF_SINGLE_THREADED) + #include "scheduler_sync_tag_advance.h" #include "rti_local.h" #include "environment.h" #include "tracepoint.h" #include "util.h" -/////////////////// External Functions ///////////////////////// -/** - * Placeholder for function that will advance tag and initially fill the - * reaction queue. - * - * This does not acquire the mutex lock. It assumes the lock is already held. - */ +// Forward declaration of function defined in reactor_threaded.h +void _lf_next_locked(struct environment_t* env); /** * @brief Indicator that execution of at least one tag has completed. */ static bool _latest_tag_completed = false; -/** - * Return true if the worker should stop now; false otherwise. - * This function assumes the caller holds the mutex lock. - */ bool should_stop_locked(lf_scheduler_t* sched) { // If this is not the very first step, check against the stop tag to see whether this is the last step. if (_latest_tag_completed) { @@ -70,14 +36,6 @@ bool should_stop_locked(lf_scheduler_t* sched) { return false; } -/** - * Advance tag. This will also pop events for the newly acquired tag and put - * the triggered reactions on the '_lf_sched_vector_of_reaction_qs'. - * - * This function assumes the caller holds the 'mutex' lock. - * - * @return should_exit True if the worker thread should exit. False otherwise. - */ bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched) { environment_t* env = sched->env; logical_tag_complete(env->current_tag); diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 1c1028c23..b154b869e 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -496,6 +496,13 @@ void lf_spawn_staa_thread(void); */ void lf_stall_advance_level_federation(environment_t* env, size_t level); +/** + * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock. + * @param env The environment (which should always be the top-level environment). + * @param level The level to which we would like to advance. + */ +void lf_stall_advance_level_federation_locked(environment_t* env, size_t level); + /** * @brief Synchronize the start with other federates via the RTI. * diff --git a/include/core/threaded/scheduler_sync_tag_advance.h b/include/core/threaded/scheduler_sync_tag_advance.h index 3de92e540..f14785dc8 100644 --- a/include/core/threaded/scheduler_sync_tag_advance.h +++ b/include/core/threaded/scheduler_sync_tag_advance.h @@ -1,27 +1,12 @@ -/************* -Copyright (c) 2022, The University of Texas at Dallas. -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ +/** + * @file scheduler_sync_tag_advance.h + * @author Soroush Bateni (soroush@utdallas.edu) + * @author Edward A. Lee + * @author Marten Lohstroh + * @brief API used to advance tag globally. + * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas + * License: BSD 2-clause + */ #ifndef SCHEDULER_SYNC_TAG_ADVANCE_H #define SCHEDULER_SYNC_TAG_ADVANCE_H @@ -31,8 +16,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "tag.h" #include "scheduler_instance.h" -/////////////////// External Functions ///////////////////////// -void _lf_next_locked(struct environment_t* env); /** * Placeholder for code-generated function that will, in a federated * execution, be used to coordinate the advancement of tag. It will notify @@ -42,7 +25,25 @@ void _lf_next_locked(struct environment_t* env); * @param tag_to_send The tag to send. */ void logical_tag_complete(tag_t tag_to_send); + +/** + * @brief Return true if the worker should stop now; false otherwise. + * + * This function assumes the caller holds the mutex lock. + * @param sched The scheduler instance to check. + */ bool should_stop_locked(lf_scheduler_t* sched); + +/** + * @brief Advance the tag to the next tag on the event queue + * + * This will also pop events for the newly acquired tag and trigger + * the enabled reactions using the scheduler. + * + * This function assumes the caller holds the environment mutex lock. + * @param sched The scheduler instance to check. + * @return True if the worker thread should exit. False otherwise. + */ bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched); #endif // LF_C11_THREADS_SUPPORT_H From 58a24ec53df10c966701eff7699c4fd91683357e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 10:06:25 +0200 Subject: [PATCH 06/36] Format --- core/threaded/reactor_threaded.c | 4 ++-- core/threaded/scheduler_GEDF_NP.c | 22 +++++++++---------- include/core/threaded/reactor_threaded.h | 2 +- .../threaded/scheduler_sync_tag_advance.h | 4 ++-- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 559e4bda9..6915160e9 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -862,10 +862,10 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) /** * @brief The main looping logic of each LF worker thread. - * + * * This function returns when the scheduler's lf_sched_get_ready_reaction() * implementation returns NULL, indicating that there are no more reactions to execute. - * + * * This function assumes the caller does not hold the mutex lock on the environment. * * @param env Environment within which we are executing. diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 8520423cd..414b4e208 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -118,20 +118,19 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu reaction_t* reaction_to_return = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); if (reaction_to_return != NULL) { // Found a reaction. Check the level. Notice that because of deadlines, the current level - // may advance to the maximum and then back down to 0. + // may advance to the maximum and then back down to 0. if (LF_LEVEL(reaction_to_return->index) == scheduler->custom_data->current_level) { // Found a reaction at the current level. - LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %zu.", - worker_number, scheduler->custom_data->current_level); + LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %zu.", worker_number, + scheduler->custom_data->current_level); // Remove the reaction from the queue. pqueue_pop(scheduler->custom_data->reaction_q); // If there is another reaction at the current level and an idle thread, then // notify an idle thread. reaction_t* next_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); - if (next_reaction != NULL - && LF_LEVEL(next_reaction->index) == scheduler->custom_data->current_level - && scheduler->number_of_idle_workers > 0) { + if (next_reaction != NULL && LF_LEVEL(next_reaction->index) == scheduler->custom_data->current_level && + scheduler->number_of_idle_workers > 0) { // Notify an idle thread. LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); } @@ -139,8 +138,8 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu return reaction_to_return; } else { // Found a reaction at a level other than the current level. - LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %lld. Current level is %zu", - worker_number, LF_LEVEL(reaction_to_return->index), scheduler->custom_data->current_level); + LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %lld. Current level is %zu", worker_number, + LF_LEVEL(reaction_to_return->index), scheduler->custom_data->current_level); // We need to wait to advance to the next level or get a new reaction at the current level. if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { // All other workers are idle. Advance to the next level. @@ -149,13 +148,12 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // having been given precedence over levels. Reset the current level to 1. scheduler->custom_data->current_level = 0; } - LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", - scheduler->custom_data->current_level); + LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); #ifdef FEDERATED // In case there are blocking network input reactions at this level, stall. lf_stall_advance_level_federation_locked(scheduler->env, scheduler->custom_data->current_level); #endif - } else { + } else { // Some workers are still working on reactions on the current level. // Wait for them to finish. wait_for_other_workers_to_finish(scheduler, worker_number); @@ -209,7 +207,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction } void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { - (void)worker_number; // Suppress unused parameter warning. + (void)worker_number; // Suppress unused parameter warning. if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) { return; } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 4fa9b07a0..fa3513319 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -14,7 +14,7 @@ /** * @brief Advance to the next level. - * + * * For federated runtimes, this function should * stall the advance until we know that we can safely execute the next level * given knowledge about upstream network port statuses. diff --git a/include/core/threaded/scheduler_sync_tag_advance.h b/include/core/threaded/scheduler_sync_tag_advance.h index f14785dc8..fe0390f86 100644 --- a/include/core/threaded/scheduler_sync_tag_advance.h +++ b/include/core/threaded/scheduler_sync_tag_advance.h @@ -28,7 +28,7 @@ void logical_tag_complete(tag_t tag_to_send); /** * @brief Return true if the worker should stop now; false otherwise. - * + * * This function assumes the caller holds the mutex lock. * @param sched The scheduler instance to check. */ @@ -36,7 +36,7 @@ bool should_stop_locked(lf_scheduler_t* sched); /** * @brief Advance the tag to the next tag on the event queue - * + * * This will also pop events for the newly acquired tag and trigger * the enabled reactions using the scheduler. * From 9a34786cdd06d25516877c6be1c009c1e44adb9e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 10:29:21 +0200 Subject: [PATCH 07/36] Removed unused variable --- core/federated/federate.c | 4 ++-- core/threaded/scheduler_GEDF_NP.c | 4 ++-- include/core/federated/federate.h | 3 +-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index e3e4dbc61..cf0608eb0 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2611,7 +2611,7 @@ void lf_set_federation_id(const char* fid) { federation_metadata.federation_id = void lf_spawn_staa_thread() { lf_thread_create(&_fed.staaSetter, update_ports_from_staa_offsets, NULL); } #endif // FEDERATED_DECENTRALIZED -void lf_stall_advance_level_federation_locked(environment_t* env, size_t level) { +void lf_stall_advance_level_federation_locked(size_t level) { LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); while (((int)level) >= max_level_allowed_to_advance) { lf_cond_wait(&lf_port_status_changed); @@ -2623,7 +2623,7 @@ void lf_stall_advance_level_federation(environment_t* env, size_t level) { LF_PRINT_DEBUG("Acquiring the environment mutex."); LF_MUTEX_LOCK(&env->mutex); LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); - lf_stall_advance_level_federation_locked(env, level); + lf_stall_advance_level_federation_locked(level); LF_MUTEX_UNLOCK(&env->mutex); } diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 414b4e208..14950487f 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -151,7 +151,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); #ifdef FEDERATED // In case there are blocking network input reactions at this level, stall. - lf_stall_advance_level_federation_locked(scheduler->env, scheduler->custom_data->current_level); + lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); #endif } else { // Some workers are still working on reactions on the current level. @@ -184,7 +184,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu scheduler->custom_data->current_level = 0; #ifdef FEDERATED // In case there are blocking network input reactions at this level, stall. - lf_stall_advance_level_federation_locked(scheduler->env, scheduler->custom_data->current_level); + lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); #endif } else { // Some other workers are still working on reactions on the current level. diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index b154b869e..230f3e277 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -498,10 +498,9 @@ void lf_stall_advance_level_federation(environment_t* env, size_t level); /** * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock. - * @param env The environment (which should always be the top-level environment). * @param level The level to which we would like to advance. */ -void lf_stall_advance_level_federation_locked(environment_t* env, size_t level); +void lf_stall_advance_level_federation_locked(size_t level); /** * @brief Synchronize the start with other federates via the RTI. From 12aa9caa6f9ec6ea537e468426ccdbd1c09be5af Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 10:45:57 +0200 Subject: [PATCH 08/36] Put inline declaration first --- core/threaded/scheduler_GEDF_NP.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 14950487f..3d6cdfbac 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -43,7 +43,7 @@ typedef struct custom_scheduler_data_t { * @param scheduler The scheduler. * @param worker_number The number of the worker thread. */ -static void inline wait_for_other_workers_to_finish(lf_scheduler_t* scheduler, int worker_number) { +inline static void wait_for_other_workers_to_finish(lf_scheduler_t* scheduler, int worker_number) { scheduler->number_of_idle_workers++; tracepoint_worker_wait_starts(scheduler->env, worker_number); LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); From 60be3d188072986ee61de4650541a4e084d37a0d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 11:14:10 +0200 Subject: [PATCH 09/36] Include federate.h conditionally --- core/threaded/scheduler_GEDF_NP.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 3d6cdfbac..e1f583205 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -28,6 +28,10 @@ #include "tracepoint.h" #include "util.h" +#ifdef FEDERATED +#include "federate.h" +#endif + // Data specific to the GEDF scheduler. typedef struct custom_scheduler_data_t { pqueue_t* reaction_q; From 94e377c07a468cae67094f4c5905b06ade75ab03 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 19:10:31 +0200 Subject: [PATCH 10/36] Removed unused function --- core/reactor.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/reactor.c b/core/reactor.c index 00df9e07f..06a031bbd 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -286,12 +286,6 @@ void lf_request_stop(void) { lf_set_stop_tag(env, new_stop_tag); } -/** - * Return false. - * @param reaction The reaction. - */ -bool _lf_is_blocked_by_executing_reaction(void) { return false; } - /** * The main loop of the LF program. * From ccf9385e7118989834b438ac5b357e46ba01ca17 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 19:10:55 +0200 Subject: [PATCH 11/36] Use cutsom_data field for executing_reactions --- core/threaded/scheduler_NP.c | 24 ++++++++++++++-------- include/core/threaded/scheduler_instance.h | 5 ----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index c9388c294..c585f38fe 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -27,7 +27,13 @@ #include "util.h" #include "reactor_threaded.h" +// Data specific to the NP scheduler. +typedef struct custom_scheduler_data_t { + reaction_t** executing_reactions; +} custom_scheduler_data_t; + /////////////////// Scheduler Private API ///////////////////////// + /** * @brief Insert 'reaction' into * scheduler->triggered_reactions at the appropriate level. @@ -88,12 +94,12 @@ static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level); try_advance_level(scheduler->env, &scheduler->next_reaction_level); - scheduler->executing_reactions = + scheduler->custom_data->executing_reactions = (void*)((reaction_t***)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1]; LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->next_reaction_level - 1, - (void*)((reaction_t**)scheduler->executing_reactions)[0]); - if (((reaction_t**)scheduler->executing_reactions)[0] != NULL) { + (void*)((reaction_t**)scheduler->custom_data->executing_reactions)[0]); + if (((reaction_t**)scheduler->custom_data->executing_reactions)[0] != NULL) { // There is at least one reaction to execute return 1; } @@ -201,6 +207,7 @@ static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_num } ///////////////////// Scheduler Init and Destroy API ///////////////////////// + /** * @brief Initialize the scheduler. * @@ -235,6 +242,8 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* env->scheduler->triggered_reactions = calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**)); + env->scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); + env->scheduler->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); @@ -254,8 +263,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* // Initialize the mutexes for the reaction vectors LF_MUTEX_INIT(&env->scheduler->array_of_mutexes[i]); } - - env->scheduler->executing_reactions = (void*)((reaction_t***)env->scheduler->triggered_reactions)[0]; + env->scheduler->custom_data->executing_reactions = ((reaction_t***)env->scheduler->triggered_reactions)[0]; } /** @@ -270,7 +278,7 @@ void lf_sched_free(lf_scheduler_t* scheduler) { } free(scheduler->triggered_reactions); } - + free(scheduler->custom_data); lf_semaphore_destroy(scheduler->semaphore); } @@ -302,8 +310,8 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu LF_PRINT_DEBUG("Scheduler: Worker %d popping reaction with level %zu, index " "for level: %d.", worker_number, current_level, current_level_q_index); - reaction_to_return = ((reaction_t**)scheduler->executing_reactions)[current_level_q_index]; - ((reaction_t**)scheduler->executing_reactions)[current_level_q_index] = NULL; + reaction_to_return = scheduler->custom_data->executing_reactions[current_level_q_index]; + scheduler->custom_data->executing_reactions[current_level_q_index] = NULL; } #ifdef FEDERATED lf_mutex_unlock(&scheduler->array_of_mutexes[current_level]); diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index fa6255af3..0f1f8519e 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -105,11 +105,6 @@ typedef struct lf_scheduler_t { */ volatile int* indexes; - /** - * @brief Hold currently executing reactions. - */ - void* executing_reactions; - /** * @brief Hold reactions temporarily. */ From 3abd387b79023175f0691d3d40c454de06c383ae Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 19:17:18 +0200 Subject: [PATCH 12/36] Moved array_of_mutexes to custom_data --- core/threaded/scheduler_GEDF_NP.c | 1 - core/threaded/scheduler_NP.c | 14 ++++++++------ include/core/threaded/scheduler_instance.h | 8 -------- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index e1f583205..da63aed61 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -80,7 +80,6 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* // Just one reaction queue and mutex for each environment. scheduler->triggered_reactions = calloc(1, sizeof(pqueue_t*)); - scheduler->array_of_mutexes = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t)); scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index c585f38fe..4c1eb5458 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -30,6 +30,7 @@ // Data specific to the NP scheduler. typedef struct custom_scheduler_data_t { reaction_t** executing_reactions; + lf_mutex_t* array_of_mutexes; } custom_scheduler_data_t; /////////////////// Scheduler Private API ///////////////////////// @@ -57,7 +58,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction // reactions (and therefore calling this function). if (reaction_level == current_level) { LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level); - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[reaction_level]); LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level); } // The level index for the current level can sometimes become negative. Set @@ -75,7 +76,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction LF_PRINT_DEBUG("Scheduler: Index for level %zu is at %d.", reaction_level, reaction_q_level_index); #ifdef FEDERATED if (reaction_level == current_level) { - LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]); + LF_MUTEX_UNLOCK(&scheduler->custom_data->array_of_mutexes[reaction_level]); } #endif } @@ -244,7 +245,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* env->scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); - env->scheduler->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); + env->scheduler->custom_data->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); @@ -261,7 +262,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* LF_PRINT_DEBUG("Scheduler: Initialized vector of reactions for level %zu with size %zu", i, queue_size); // Initialize the mutexes for the reaction vectors - LF_MUTEX_INIT(&env->scheduler->array_of_mutexes[i]); + LF_MUTEX_INIT(&env->scheduler->custom_data->array_of_mutexes[i]); } env->scheduler->custom_data->executing_reactions = ((reaction_t***)env->scheduler->triggered_reactions)[0]; } @@ -278,6 +279,7 @@ void lf_sched_free(lf_scheduler_t* scheduler) { } free(scheduler->triggered_reactions); } + free(scheduler->custom_data->array_of_mutexes); free(scheduler->custom_data); lf_semaphore_destroy(scheduler->semaphore); } @@ -303,7 +305,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu #ifdef FEDERATED // Need to lock the mutex because federate.c could trigger reactions at // the current level (if there is a causality loop) - LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]); + LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[current_level]); #endif int current_level_q_index = lf_atomic_add_fetch32((int32_t*)&scheduler->indexes[current_level], -1); if (current_level_q_index >= 0) { @@ -314,7 +316,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu scheduler->custom_data->executing_reactions[current_level_q_index] = NULL; } #ifdef FEDERATED - lf_mutex_unlock(&scheduler->array_of_mutexes[current_level]); + lf_mutex_unlock(&scheduler->custom_data->array_of_mutexes[current_level]); #endif if (reaction_to_return != NULL) { diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index 0f1f8519e..10de26628 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -89,14 +89,6 @@ typedef struct lf_scheduler_t { */ void* triggered_reactions; - /** - * @brief Mutex used to protect the reaction queue. - * - * Can be used to avoid race conditions. Schedulers are allowed to - * initialize as many mutexes as they deem fit. - */ - lf_mutex_t* array_of_mutexes; - /** * @brief An array of atomic indexes. * From ba7fcbe8b06b636f657666e4315d60eb92c213c8 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 19:29:35 +0200 Subject: [PATCH 13/36] Moved triggered_reactions to custom_data --- core/threaded/scheduler_GEDF_NP.c | 7 ----- core/threaded/scheduler_NP.c | 34 +++++++++++++--------- include/core/threaded/scheduler_instance.h | 5 ---- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index da63aed61..7bc3b8dda 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -78,9 +78,6 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* } lf_scheduler_t* scheduler = env->scheduler; - // Just one reaction queue and mutex for each environment. - scheduler->triggered_reactions = calloc(1, sizeof(pqueue_t*)); - scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); // Initialize the reaction queue. @@ -100,10 +97,6 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* * This must be called when the scheduler is no longer needed. */ void lf_sched_free(lf_scheduler_t* scheduler) { - // for (size_t j = 0; j <= scheduler->max_reaction_level; j++) { - // pqueue_free(scheduler->triggered_reactions[j]); - // FIXME: This is causing weird memory errors. - // } pqueue_free((pqueue_t*)scheduler->custom_data->reaction_q); free(scheduler->custom_data); } diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 4c1eb5458..bb6f5959f 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -31,13 +31,13 @@ typedef struct custom_scheduler_data_t { reaction_t** executing_reactions; lf_mutex_t* array_of_mutexes; + reaction_t*** triggered_reactions; } custom_scheduler_data_t; /////////////////// Scheduler Private API ///////////////////////// /** - * @brief Insert 'reaction' into - * scheduler->triggered_reactions at the appropriate level. + * @brief Insert 'reaction' into scheduler->triggered_reactions at the appropriate level. * * @param reaction The reaction to insert. */ @@ -72,7 +72,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction assert(reaction_q_level_index >= 0); LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level, reaction_q_level_index); - ((reaction_t***)scheduler->triggered_reactions)[reaction_level][reaction_q_level_index] = reaction; + ((reaction_t***)scheduler->custom_data->triggered_reactions)[reaction_level][reaction_q_level_index] = reaction; LF_PRINT_DEBUG("Scheduler: Index for level %zu is at %d.", reaction_level, reaction_q_level_index); #ifdef FEDERATED if (reaction_level == current_level) { @@ -96,11 +96,11 @@ static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { try_advance_level(scheduler->env, &scheduler->next_reaction_level); scheduler->custom_data->executing_reactions = - (void*)((reaction_t***)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1]; + scheduler->custom_data->triggered_reactions[scheduler->next_reaction_level - 1]; LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->next_reaction_level - 1, (void*)((reaction_t**)scheduler->custom_data->executing_reactions)[0]); - if (((reaction_t**)scheduler->custom_data->executing_reactions)[0] != NULL) { + if (scheduler->custom_data->executing_reactions[0] != NULL) { // There is at least one reaction to execute return 1; } @@ -241,13 +241,17 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* LF_PRINT_DEBUG("Scheduler: Max reaction level: %zu", env->scheduler->max_reaction_level); - env->scheduler->triggered_reactions = calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**)); + env->scheduler->custom_data + = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); - env->scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); + env->scheduler->custom_data->triggered_reactions + = (reaction_t***)calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**)); - env->scheduler->custom_data->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); + env->scheduler->custom_data->array_of_mutexes + = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); - env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); + env->scheduler->indexes + = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); size_t queue_size = INITIAL_REACT_QUEUE_SIZE; for (size_t i = 0; i <= env->scheduler->max_reaction_level; i++) { @@ -257,14 +261,16 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* } } // Initialize the reaction vectors - ((reaction_t***)env->scheduler->triggered_reactions)[i] = (reaction_t**)calloc(queue_size, sizeof(reaction_t*)); + env->scheduler->custom_data->triggered_reactions[i] + = (reaction_t**)calloc(queue_size, sizeof(reaction_t*)); LF_PRINT_DEBUG("Scheduler: Initialized vector of reactions for level %zu with size %zu", i, queue_size); // Initialize the mutexes for the reaction vectors LF_MUTEX_INIT(&env->scheduler->custom_data->array_of_mutexes[i]); } - env->scheduler->custom_data->executing_reactions = ((reaction_t***)env->scheduler->triggered_reactions)[0]; + env->scheduler->custom_data->executing_reactions + = env->scheduler->custom_data->triggered_reactions[0]; } /** @@ -273,11 +279,11 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* * This must be called when the scheduler is no longer needed. */ void lf_sched_free(lf_scheduler_t* scheduler) { - if (scheduler->triggered_reactions) { + if (scheduler->custom_data->triggered_reactions) { for (size_t j = 0; j <= scheduler->max_reaction_level; j++) { - free(((reaction_t***)scheduler->triggered_reactions)[j]); + free(scheduler->custom_data->triggered_reactions[j]); } - free(scheduler->triggered_reactions); + free(scheduler->custom_data->triggered_reactions); } free(scheduler->custom_data->array_of_mutexes); free(scheduler->custom_data); diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index 10de26628..f734b6fd4 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -84,11 +84,6 @@ typedef struct lf_scheduler_t { */ volatile bool should_stop; - /** - * @brief Hold triggered reactions. - */ - void* triggered_reactions; - /** * @brief An array of atomic indexes. * From 7a6541ae33fda9ee9159adb532f36db6f13a0a58 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 22 May 2024 19:43:16 +0200 Subject: [PATCH 14/36] Moved semaphore to custom_data --- core/threaded/scheduler_NP.c | 20 ++++++++++++-------- core/threaded/scheduler_instance.c | 1 - include/core/threaded/scheduler_instance.h | 16 +--------------- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index bb6f5959f..f64f73cb7 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -32,6 +32,10 @@ typedef struct custom_scheduler_data_t { reaction_t** executing_reactions; lf_mutex_t* array_of_mutexes; reaction_t*** triggered_reactions; + lf_semaphore_t* semaphore; // Signal the maximum number of worker threads that should + // be executing work at the same time. Initially 0. + // For example, if the scheduler releases the semaphore with a count of 4, + // no more than 4 worker threads should wake up to process reactions. } custom_scheduler_data_t; /////////////////// Scheduler Private API ///////////////////////// @@ -129,7 +133,7 @@ static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { if (workers_to_awaken > 1) { // Notify all the workers except the worker thread that has called this // function. - lf_semaphore_release(scheduler->semaphore, (workers_to_awaken - 1)); + lf_semaphore_release(scheduler->custom_data->semaphore, (workers_to_awaken - 1)); } } @@ -139,7 +143,7 @@ static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { */ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { scheduler->should_stop = true; - lf_semaphore_release(scheduler->semaphore, (scheduler->number_of_workers - 1)); + lf_semaphore_release(scheduler->custom_data->semaphore, (scheduler->number_of_workers - 1)); } /** @@ -184,7 +188,7 @@ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* schedul * * If the calling worker thread is the last to become idle, it will call on the * scheduler to distribute work. Otherwise, it will wait on - * 'scheduler->semaphore'. + * 'scheduler->custom_data->semaphore'. * * @param worker_number The worker number of the worker thread asking for work * to be assigned to it. @@ -199,10 +203,8 @@ static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_num _lf_scheduler_try_advance_tag_and_distribute(scheduler); } else { // Not the last thread to become idle. Wait for work to be released. - LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling " - "semaphore.", - worker_number); - lf_semaphore_acquire(scheduler->semaphore); + LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling semaphore.", worker_number); + lf_semaphore_acquire(scheduler->custom_data->semaphore); LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number); } } @@ -250,6 +252,8 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* env->scheduler->custom_data->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); + env->scheduler->custom_data->semaphore = lf_semaphore_new(0); + env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); @@ -286,8 +290,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) { free(scheduler->custom_data->triggered_reactions); } free(scheduler->custom_data->array_of_mutexes); + lf_semaphore_destroy(scheduler->custom_data->semaphore); free(scheduler->custom_data); - lf_semaphore_destroy(scheduler->semaphore); } ///////////////////// Scheduler Worker API (public) ///////////////////////// diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c index 5487ead65..40479c378 100644 --- a/core/threaded/scheduler_instance.c +++ b/core/threaded/scheduler_instance.c @@ -32,7 +32,6 @@ bool init_sched_instance(environment_t* env, lf_scheduler_t** instance, size_t n } } - (*instance)->semaphore = lf_semaphore_new(0); (*instance)->number_of_workers = number_of_workers; (*instance)->next_reaction_level = 1; diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index f734b6fd4..12d38136f 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -42,8 +42,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS -#include "lf_semaphore.h" #include +#include // for size_t #define DEFAULT_MAX_REACTION_LEVEL 100 @@ -65,20 +65,6 @@ typedef struct lf_scheduler_t { */ size_t max_reaction_level; - /** - * @brief Used by the scheduler to signal the maximum number of worker - * threads that should be executing work at the same time. - * - * Initially, the count is set to 0. Maximum value of count should be - * `number_of_workers`. - * - * For example, if the scheduler releases the semaphore with a count of 4, - * no more than 4 worker threads should wake up to process reactions. - * - * FIXME: specific comment - */ - lf_semaphore_t* semaphore; - /** * @brief Indicate whether the program should stop */ From 90ea06c076b85aa99633a0af1221b22e3c4de5bd Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Thu, 23 May 2024 09:43:02 +0200 Subject: [PATCH 15/36] Format --- core/threaded/scheduler_NP.c | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index f64f73cb7..868683732 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -243,19 +243,17 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* LF_PRINT_DEBUG("Scheduler: Max reaction level: %zu", env->scheduler->max_reaction_level); - env->scheduler->custom_data - = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); + env->scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); - env->scheduler->custom_data->triggered_reactions - = (reaction_t***)calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**)); + env->scheduler->custom_data->triggered_reactions = + (reaction_t***)calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**)); - env->scheduler->custom_data->array_of_mutexes - = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); + env->scheduler->custom_data->array_of_mutexes = + (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t)); env->scheduler->custom_data->semaphore = lf_semaphore_new(0); - env->scheduler->indexes - = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); + env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); size_t queue_size = INITIAL_REACT_QUEUE_SIZE; for (size_t i = 0; i <= env->scheduler->max_reaction_level; i++) { @@ -265,16 +263,14 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* } } // Initialize the reaction vectors - env->scheduler->custom_data->triggered_reactions[i] - = (reaction_t**)calloc(queue_size, sizeof(reaction_t*)); + env->scheduler->custom_data->triggered_reactions[i] = (reaction_t**)calloc(queue_size, sizeof(reaction_t*)); LF_PRINT_DEBUG("Scheduler: Initialized vector of reactions for level %zu with size %zu", i, queue_size); // Initialize the mutexes for the reaction vectors LF_MUTEX_INIT(&env->scheduler->custom_data->array_of_mutexes[i]); } - env->scheduler->custom_data->executing_reactions - = env->scheduler->custom_data->triggered_reactions[0]; + env->scheduler->custom_data->executing_reactions = env->scheduler->custom_data->triggered_reactions[0]; } /** From 297c68b6851a124d50a15c046bc55e5d365d9190 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Thu, 23 May 2024 19:10:26 +0200 Subject: [PATCH 16/36] Point to gedf master branch for testing --- lingua-franca-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..1a93378e4 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +gedf From d5b1a6dc042eca35c6c0b2548d11e4495962ac4e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:05:25 +0200 Subject: [PATCH 17/36] File header comments only --- core/environment.c | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/core/environment.c b/core/environment.c index 4523c4721..d2d56a593 100644 --- a/core/environment.c +++ b/core/environment.c @@ -1,32 +1,11 @@ /** * @file - * @author Erling R. Jellum (erling.r.jellum@ntnu.no) + * @author Erling R. Jellum + * @copyright (c) 2023-2024, The Norwegian University of Science and Technology. + * License: BSD 2-clause * - * @section LICENSE - * Copyright (c) 2023, The Norwegian University of Science and Technology. - * - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * @section DESCRIPTION Functions intitializing and freeing memory for environments. - * See environment.h for docs. + * This file defines functions intitializing and freeing memory for environments. + * See environment.h for docs. */ #include "environment.h" From 08f65ff661278b2c3b505bec97e193ba5d38fbce Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:06:10 +0200 Subject: [PATCH 18/36] Debug statements only --- core/federated/federate.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index cf0608eb0..498687d07 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2612,17 +2612,16 @@ void lf_spawn_staa_thread() { lf_thread_create(&_fed.staaSetter, update_ports_fr #endif // FEDERATED_DECENTRALIZED void lf_stall_advance_level_federation_locked(size_t level) { - LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); + LF_PRINT_DEBUG("Waiting for MLAA %d to exceed level %zu.", max_level_allowed_to_advance, level); while (((int)level) >= max_level_allowed_to_advance) { lf_cond_wait(&lf_port_status_changed); }; - LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, level); + LF_PRINT_DEBUG("Exiting wait with MLAA %d and level %zu.", max_level_allowed_to_advance, level); } void lf_stall_advance_level_federation(environment_t* env, size_t level) { LF_PRINT_DEBUG("Acquiring the environment mutex."); LF_MUTEX_LOCK(&env->mutex); - LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance); lf_stall_advance_level_federation_locked(level); LF_MUTEX_UNLOCK(&env->mutex); } From 6fba73d5ae6ed8e12a517c799ec96abf9a1ff8e2 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:08:17 +0200 Subject: [PATCH 19/36] Comments and debug statements only --- core/threaded/reactor_threaded.c | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 761e92543..b2a9505e1 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1,8 +1,8 @@ /** * @file - * @author Edward A. Lee (eal@berkeley.edu) - * @author{Marten Lohstroh } - * @author{Soroush Bateni } + * @author Edward A. Lee + * @author Marten Lohstroh + * @author Soroush Bateni * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca. @@ -850,16 +850,6 @@ void _lf_worker_invoke_reaction(environment_t* env, int worker_number, reaction_ reaction->is_STP_violated = false; } -void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) { -#ifdef FEDERATED - lf_stall_advance_level_federation(env, *next_reaction_level); -#else - (void)env; -#endif - if (*next_reaction_level < SIZE_MAX) - *next_reaction_level += 1; -} - /** * @brief The main looping logic of each LF worker thread. * @@ -886,10 +876,9 @@ void _lf_worker_do_work(environment_t* env, int worker_number) { while ((current_reaction_to_execute = lf_sched_get_ready_reaction(env->scheduler, worker_number)) != NULL) { // Got a reaction that is ready to run. LF_PRINT_DEBUG("Worker %d: Got from scheduler reaction %s: " - "level: %lld, is input reaction: %d, chain ID: %llu, and deadline " PRINTF_TIME ".", + "level: %lld, is input reaction: %d, and deadline " PRINTF_TIME ".", worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), - current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, - current_reaction_to_execute->deadline); + current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline); bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); From 78d8755f46a4df0c2f57b931e0304a3b1ad8c1a2 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:11:01 +0200 Subject: [PATCH 20/36] Comments and ifdef guard --- core/threaded/scheduler_GEDF_NP.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 7bc3b8dda..5c7ff80ab 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -1,12 +1,20 @@ /** * @file - * @author{Soroush Bateni } - * @author{Edward A. Lee } - * @author{Marten Lohstroh } + * @author Soroush Bateni + * @author Edward A. Lee + * @author Marten Lohstroh * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Global Earliest Deadline First (GEDF) non-preemptive scheduler for the * threaded runtime of the C target of Lingua Franca. + * + * At each tag, this scheduler prioritizes reactions with the smallest (inferred) deadline. + * An inferred deadline for reaction _R_ is either an explicitly declared deadline or the declared deadline of + * a reaction that depends on _R_. This scheduler is non-preemptive, meaning that once a worker thread starts + * executing a reaction, it will execute that reaction to completion. The underlying thread scheduler, of + * course, could preempt the execution in favor of some other worker thread. + * This scheduler does not take into account execution times of reactions. + * Moreover, it does not prioritize reactions across distinct tags. */ #include "lf_types.h" @@ -221,7 +229,7 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti // has one level higher than the current level. No need to notify idle threads. // But in federated execution, it could be called because of message arrival. // Also, in modal models, reset and startup reactions may be triggered. -#if defined(FEDERATED) || defined(MODAL) +#if defined(FEDERATED) || (defined(MODAL) && !defined(LF_SINGLE_THREADED)) reaction_t* triggered_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); if (LF_LEVEL(triggered_reaction->index) == scheduler->custom_data->current_level) { LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); From f363abe2c408306e7292427cb1f4457caab1f3c7 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:13:02 +0200 Subject: [PATCH 21/36] Comments only --- core/threaded/scheduler_sync_tag_advance.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index 5b52495a4..cc91c88f0 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -1,8 +1,8 @@ /** - * @file scheduler_sync_tag_advance.c - * @author Soroush Bateni (soroush@utdallas.edu) - * @author Edward A. Lee - * @author Marten Lohstroh + * @file + * @author Soroush Bateni + * @author Edward A. Lee + * @author Marten Lohstroh * @brief API used to advance tag globally. * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas * License: BSD 2-clause From f730dd217a0eddf11957a6dab3f555c0314bdd02 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:13:27 +0200 Subject: [PATCH 22/36] Debug statement only --- core/utils/pqueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/utils/pqueue.c b/core/utils/pqueue.c index b2bf05090..65f6dd1d9 100644 --- a/core/utils/pqueue.c +++ b/core/utils/pqueue.c @@ -35,5 +35,5 @@ void set_reaction_position(void* reaction, size_t pos) { ((reaction_t*)reaction) void print_reaction(void* reaction) { reaction_t* r = (reaction_t*)reaction; - LF_PRINT_DEBUG("%s: chain_id: %llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, reaction); + LF_PRINT_DEBUG("%s: index: %llx, reaction: %p", r->name, r->index, reaction); } From 5a7ca7b339938d41ae19b9b350a82a1583949848 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:13:58 +0200 Subject: [PATCH 23/36] File header comments only --- include/core/environment.h | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/include/core/environment.h b/include/core/environment.h index a776dee95..038f97e4e 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -1,31 +1,11 @@ /** * @file - * @author Erling R. Jellum (erling.r.jellum@ntnu.no) + * @author Erling R. Jellum + * @copyright (c) 2023, The Norwegian University of Science and Technology. + * License: BSD 2-clause + * @brief API for the environment data structure. * - * @section LICENSE - * Copyright (c) 2023, The Norwegian University of Science and Technology. - * - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * @section DESCRIPTION API for creating and destroying environments. An environment is the + * This is an API for creating and destroying environments. An environment is the * "context" within which the reactors are executed. The environment contains data structures * which are shared among the reactors such as priority queues, the current logical tag, * the worker scheduler, and a lot of meta data. Each reactor stores a pointer to its From 9ee30928159e4497d109c6728dc8b1579425452e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:15:15 +0200 Subject: [PATCH 24/36] File header comments only --- include/core/threaded/scheduler_sync_tag_advance.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/core/threaded/scheduler_sync_tag_advance.h b/include/core/threaded/scheduler_sync_tag_advance.h index fe0390f86..309fffd1e 100644 --- a/include/core/threaded/scheduler_sync_tag_advance.h +++ b/include/core/threaded/scheduler_sync_tag_advance.h @@ -1,8 +1,8 @@ /** - * @file scheduler_sync_tag_advance.h - * @author Soroush Bateni (soroush@utdallas.edu) - * @author Edward A. Lee - * @author Marten Lohstroh + * @file + * @author Soroush Bateni + * @author Edward A. Lee + * @author Marten Lohstroh * @brief API used to advance tag globally. * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas * License: BSD 2-clause From 167093a0cce417402a93a09dd352f8b050b654dd Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:16:33 +0200 Subject: [PATCH 25/36] Remove chain_id --- include/core/lf_types.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/include/core/lf_types.h b/include/core/lf_types.h index 75a61e405..a3a103041 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -157,9 +157,7 @@ struct reaction_t { void* self; // Pointer to a struct with the reactor's state. INSTANCE. int number; // The number of the reaction in the reactor (0 is the first reaction). index_t index; // Inverse priority determined by dependency analysis. INSTANCE. - // Binary encoding of the branches that this reaction has upstream in the dependency graph. INSTANCE. - unsigned long long chain_id; - size_t pos; // Current position in the priority queue. RUNTIME. + size_t pos; // Current position in the priority queue. RUNTIME. reaction_t* last_enabling_reaction; // The last enabling reaction, or NULL if there is none. Used for optimization. INSTANCE. size_t num_outputs; // Number of outputs that may possibly be produced by this function. COMMON. From 8a6185103d6dcbcba7e2d4e474ab8a0a8c0f1a8c Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:17:07 +0200 Subject: [PATCH 26/36] Comments only --- include/core/threaded/scheduler.h | 42 ++++--------------------------- 1 file changed, 5 insertions(+), 37 deletions(-) diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index 44997bc74..f49f0bc54 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -1,38 +1,13 @@ -/************* -Copyright (c) 2022, The University of Texas at Dallas. -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * @file scheduler.h - * @author Soroush Bateni + * @file + * @author Soroush Bateni + * @author Edward A. Lee + * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley. + * License: BSD 2-clause * @brief Scheduler API for the threaded C runtime. * * A scheduler for the threaded runtime of reactor-c should provide an * implementation for functions that are defined in this header file. - * - * @copyright Copyright (c) 2022, The University of Texas at Dallas. - * @copyright Copyright (c) 2022, The University of California at Berkeley. */ #ifndef LF_SCHEDULER_H @@ -40,13 +15,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "lf_types.h" #include "scheduler_instance.h" -/** - * @brief Default value that is assumed to be the maximum reaction level in the - * program. - * - * Can be overriden by passing the appropriate `parameters` argument to - * `lf_sched_init`. - */ /** * @brief Initialize the scheduler. From db099bbf21d82ca71c0023e72aa47c3bf8221bba Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:35:17 +0200 Subject: [PATCH 27/36] Comments only --- core/threaded/scheduler_NP.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 868683732..53957dd9c 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -1,8 +1,8 @@ /** * @file - * @author{Soroush Bateni } - * @author{Edward A. Lee } - * @author{Marten Lohstroh } + * @author Soroush Bateni + * @author Edward A. Lee + * @author Marten Lohstroh * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Non-preemptive scheduler for the threaded runtime of the C target of Lingua Franca. From 5c516f3e5260cf169a85b3c7ec5de61ebff4c62e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:36:19 +0200 Subject: [PATCH 28/36] Comments only --- core/threaded/scheduler_instance.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c index 40479c378..6e225bf33 100644 --- a/core/threaded/scheduler_instance.c +++ b/core/threaded/scheduler_instance.c @@ -1,3 +1,14 @@ +/** + * @file + * @author Soroush Bateni + * @author Edward A. Lee + * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley. + * License: BSD 2-clause + * @brief Common scheduler functions. + * + * This file defines functions that are common across multiple schedulers. + */ + #include #include "scheduler_instance.h" #include "environment.h" From 83502addb05e3af034a472d3760912bb35f3ab3b Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:37:27 +0200 Subject: [PATCH 29/36] Comments only --- include/core/threaded/reactor_threaded.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index fa3513319..ed0c14241 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -1,8 +1,8 @@ /** * @file - * @author Edward A. Lee (eal@berkeley.edu) - * @author{Marten Lohstroh } - * @author{Soroush Bateni } + * @author Edward A. Lee + * @author Marten Lohstroh + * @author Soroush Bateni * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca. From 98e97f733960c00ec0e2ac0a3529ffee60f64083 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:37:53 +0200 Subject: [PATCH 30/36] Comments only --- include/core/threaded/scheduler_instance.h | 40 ++++------------------ 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index 12d38136f..5ae9bc8d3 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -1,38 +1,12 @@ -/************* -Copyright (c) 2022, The University of Texas at Dallas. Copyright (c) 2022, The -University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * @file scheduler_params.h - * @author Soroush Bateni - * @brief Scheduler parameters. - * - * Meant for book-keeping in the threaded schedulers in the reactor C runtime. + * @file + * @author Soroush Bateni + * @author Edward A. Lee + * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley. + * License: BSD 2-clause + * @brief Common scheduler parameters. * - * @copyright Copyright (c) 2022, The University of Texas at Dallas. - * @copyright Copyright (c) 2022, The University of California at Berkeley. + * This file defines data types and functions that are common across multiple schedulers. */ #ifndef LF_SCHEDULER_PARAMS_H From 29f657c6299a3704fe68632404c9952256742973 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:39:44 +0200 Subject: [PATCH 31/36] Comments and cleanup only --- core/threaded/scheduler_adaptive.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index 4b0843028..e781919a3 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -1,6 +1,6 @@ /** * @file - * @author{Peter Donovan } + * @author Peter Donovan * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief This is a non-priority-driven scheduler. See scheduler.h for documentation. @@ -684,7 +684,6 @@ void lf_sched_free(lf_scheduler_t* scheduler) { worker_assignments_free(scheduler); data_collection_free(scheduler); free(scheduler->custom_data); - lf_semaphore_destroy(scheduler->semaphore); } ///////////////////////// Scheduler Worker API /////////////////////////////// From 80b16c693ab1faa1815eeba38e0a53fa10226382 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:42:21 +0200 Subject: [PATCH 32/36] Moved next_reaction_level to custom_data and out of scheduler API --- core/threaded/scheduler_NP.c | 37 ++++++++++++---------- core/threaded/scheduler_adaptive.c | 11 +++++-- core/threaded/scheduler_instance.c | 1 - include/core/threaded/reactor_threaded.h | 11 ------- include/core/threaded/scheduler_instance.h | 5 --- 5 files changed, 29 insertions(+), 36 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 53957dd9c..6ff6a0048 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -32,6 +32,7 @@ typedef struct custom_scheduler_data_t { reaction_t** executing_reactions; lf_mutex_t* array_of_mutexes; reaction_t*** triggered_reactions; + volatile size_t next_reaction_level; lf_semaphore_t* semaphore; // Signal the maximum number of worker threads that should // be executing work at the same time. Initially 0. // For example, if the scheduler releases the semaphore with a count of 4, @@ -50,14 +51,14 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction #ifdef FEDERATED // Lock the mutex if federated because a federate can insert reactions with // a level equal to the current level. - size_t current_level = scheduler->next_reaction_level - 1; + size_t current_level = scheduler->custom_data->next_reaction_level - 1; // There is a race condition here where - // `scheduler->next_reaction_level` can change after it is + // `scheduler->custom_data->next_reaction_level` can change after it is // cached here. In that case, if the cached value is equal to // `reaction_level`, the cost will be an additional unnecessary mutex lock, // but no logic error. If the cached value is not equal to `reaction_level`, // it can never become `reaction_level` because the scheduler will only - // change the `scheduler->next_reaction_level` if it can + // change the `scheduler->custom_data->next_reaction_level` if it can // ensure that all worker threads are idle, and thus, none are triggering // reactions (and therefore calling this function). if (reaction_level == current_level) { @@ -95,15 +96,17 @@ static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) { // Note: All the threads are idle, which means that they are done inserting // reactions. Therefore, the reaction vectors can be accessed without // locking a mutex. - while (scheduler->next_reaction_level <= scheduler->max_reaction_level) { - LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level); - try_advance_level(scheduler->env, &scheduler->next_reaction_level); - + while (scheduler->custom_data->next_reaction_level <= scheduler->max_reaction_level) { +#ifdef FEDERATED + lf_stall_advance_level_federation(scheduler->env, scheduler->custom_data->next_reaction_level); +#endif scheduler->custom_data->executing_reactions = - scheduler->custom_data->triggered_reactions[scheduler->next_reaction_level - 1]; - - LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->next_reaction_level - 1, + scheduler->custom_data->triggered_reactions[scheduler->custom_data->next_reaction_level]; + LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->custom_data->next_reaction_level, (void*)((reaction_t**)scheduler->custom_data->executing_reactions)[0]); + + scheduler->custom_data->next_reaction_level++; + if (scheduler->custom_data->executing_reactions[0] != NULL) { // There is at least one reaction to execute return 1; @@ -123,8 +126,8 @@ static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) { // number of reactions enabled at this level. // Note: All threads are idle. Therefore, there is no need to lock the mutex while accessing the index for the // current level. - size_t workers_to_awaken = - LF_MIN(scheduler->number_of_idle_workers, (size_t)(scheduler->indexes[scheduler->next_reaction_level - 1])); + size_t workers_to_awaken = LF_MIN(scheduler->number_of_idle_workers, + (size_t)(scheduler->indexes[scheduler->custom_data->next_reaction_level - 1])); LF_PRINT_DEBUG("Scheduler: Notifying %zu workers.", workers_to_awaken); scheduler->number_of_idle_workers -= workers_to_awaken; @@ -157,12 +160,12 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { // Reset the index environment_t* env = scheduler->env; - scheduler->indexes[scheduler->next_reaction_level - 1] = 0; + scheduler->indexes[scheduler->custom_data->next_reaction_level - 1] = 0; // Loop until it's time to stop or work has been distributed while (true) { - if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) { - scheduler->next_reaction_level = 0; + if (scheduler->custom_data->next_reaction_level == (scheduler->max_reaction_level + 1)) { + scheduler->custom_data->next_reaction_level = 0; LF_MUTEX_LOCK(&env->mutex); // Nothing more happening at this tag. LF_PRINT_DEBUG("Scheduler: Advancing tag."); @@ -253,6 +256,8 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* env->scheduler->custom_data->semaphore = lf_semaphore_new(0); + env->scheduler->custom_data->next_reaction_level = 1; + env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int)); size_t queue_size = INITIAL_REACT_QUEUE_SIZE; @@ -306,7 +311,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // Iterate until the stop tag is reached or reaction vectors are empty while (!scheduler->should_stop) { // Calculate the current level of reactions to execute - size_t current_level = scheduler->next_reaction_level - 1; + size_t current_level = scheduler->custom_data->next_reaction_level - 1; reaction_t* reaction_to_return = NULL; #ifdef FEDERATED // Need to lock the mutex because federate.c could trigger reactions at diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index e781919a3..d2db00658 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -21,12 +21,14 @@ #include "environment.h" #include "util.h" +#ifdef FEDERATED +#include "federate.h" +#endif + #ifndef MAX_REACTION_LEVEL #define MAX_REACTION_LEVEL INITIAL_REACT_QUEUE_SIZE #endif -void try_advance_level(environment_t* env, volatile size_t* next_reaction_level); - /////////////////// Forward declarations ///////////////////////// extern bool fast; static void worker_states_lock(lf_scheduler_t* scheduler, size_t worker); @@ -435,7 +437,10 @@ static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) { return; } } else { - try_advance_level(scheduler->env, &worker_assignments->current_level); +#ifdef FEDERATED + lf_stall_advance_level_federation(scheduler->env, worker_assignments->current_level); +#endif + worker_assignments->current_level++; set_level(scheduler, worker_assignments->current_level); } size_t total_num_reactions = get_num_reactions(scheduler); diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c index 6e225bf33..8146327bf 100644 --- a/core/threaded/scheduler_instance.c +++ b/core/threaded/scheduler_instance.c @@ -44,7 +44,6 @@ bool init_sched_instance(environment_t* env, lf_scheduler_t** instance, size_t n } (*instance)->number_of_workers = number_of_workers; - (*instance)->next_reaction_level = 1; (*instance)->should_stop = false; (*instance)->env = env; diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index ed0c14241..0d58f7431 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -12,17 +12,6 @@ #include "lf_types.h" -/** - * @brief Advance to the next level. - * - * For federated runtimes, this function should - * stall the advance until we know that we can safely execute the next level - * given knowledge about upstream network port statuses. - * @param env The environment. - * @param next_reaction_level The place to store the next reaction level. - */ -void try_advance_level(environment_t* env, volatile size_t* next_reaction_level); - /** * Enqueue port absent reactions that will send a PORT_ABSENT * message to downstream federates if a given network output port is not present. diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index 5ae9bc8d3..df55a86be 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -68,11 +68,6 @@ typedef struct lf_scheduler_t { */ volatile size_t number_of_idle_workers; - /** - * @brief The next level of reactions to execute. - */ - volatile size_t next_reaction_level; - // Pointer to an optional custom data structure that each scheduler can define. // The type is forward declared here and must be declared again in the scheduler source file // Is not touched by `init_sched_instance` and must be initialized by each scheduler that needs it From 5b7aa458f584db3c07f6c9b929228ef360770f30 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Fri, 24 May 2024 09:56:05 +0200 Subject: [PATCH 33/36] Conditionally include federate.h --- core/threaded/scheduler_NP.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 6ff6a0048..7edd41a81 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -27,6 +27,10 @@ #include "util.h" #include "reactor_threaded.h" +#ifdef FEDERATED +#include "federate.h" +#endif + // Data specific to the NP scheduler. typedef struct custom_scheduler_data_t { reaction_t** executing_reactions; From 9f65472d80fe48f8dd99bfd3bbe28d5a0579dfdd Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Tue, 28 May 2024 11:11:02 +0200 Subject: [PATCH 34/36] Response to review --- core/threaded/scheduler_GEDF_NP.c | 79 +++++++++++++++++++------------ 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 5c7ff80ab..c322b4e2e 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -55,7 +55,7 @@ typedef struct custom_scheduler_data_t { * @param scheduler The scheduler. * @param worker_number The number of the worker thread. */ -inline static void wait_for_other_workers_to_finish(lf_scheduler_t* scheduler, int worker_number) { +inline static void wait_for_reaction_queue_updates(lf_scheduler_t* scheduler, int worker_number) { scheduler->number_of_idle_workers++; tracepoint_worker_wait_starts(scheduler->env, worker_number); LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); @@ -63,6 +63,49 @@ inline static void wait_for_other_workers_to_finish(lf_scheduler_t* scheduler, i scheduler->number_of_idle_workers--; } +/** + * @brief Assuming this is the last worker to go idle, advance the tag. + * @param scheduler The scheduler. + * @return Non-zero if the stop tag has been reached. + */ +static int advance_tag(lf_scheduler_t* scheduler) { + // Set a flag in the scheduler that the lock is held by the sole executing thread. + // This prevents acquiring the mutex in lf_scheduler_trigger_reaction. + scheduler->custom_data->solo_holds_mutex = true; + if (_lf_sched_advance_tag_locked(scheduler)) { + LF_PRINT_DEBUG("Scheduler: Reached stop tag."); + scheduler->should_stop = true; + scheduler->custom_data->solo_holds_mutex = false; + // Notify all threads that the stop tag has been reached. + LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + return 1; + } + scheduler->custom_data->solo_holds_mutex = false; + // Reset the level to 0. + scheduler->custom_data->current_level = 0; +#ifdef FEDERATED + // In case there are blocking network input reactions at this level, stall. + lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); +#endif + return 0; +} + +/** + * @brief Assuming all other workers are idle, advance to the next level. + * @param scheduler The scheduler. + */ +static void advance_level(lf_scheduler_t* scheduler) { + if (++scheduler->custom_data->current_level > scheduler->max_reaction_level) { + // Since the reaction queue is not empty, we must be cycling back to level 0 due to deadlines + // having been given precedence over levels. Reset the current level to 1. + scheduler->custom_data->current_level = 0; + } + LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); +#ifdef FEDERATED + // In case there are blocking network input reactions at this level, stall. + lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); +#endif +} ///////////////////// Scheduler Init and Destroy API ///////////////////////// /** * @brief Initialize the scheduler. @@ -147,20 +190,11 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // We need to wait to advance to the next level or get a new reaction at the current level. if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { // All other workers are idle. Advance to the next level. - if (++scheduler->custom_data->current_level > scheduler->max_reaction_level) { - // Since the reaction queue is not empty, we must be cycling back to level 0 due to deadlines - // having been given precedence over levels. Reset the current level to 1. - scheduler->custom_data->current_level = 0; - } - LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level); -#ifdef FEDERATED - // In case there are blocking network input reactions at this level, stall. - lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); -#endif + advance_level(scheduler); } else { // Some workers are still working on reactions on the current level. // Wait for them to finish. - wait_for_other_workers_to_finish(scheduler, worker_number); + wait_for_reaction_queue_updates(scheduler, worker_number); } } } else { @@ -171,29 +205,14 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) { // Last thread to go idle LF_PRINT_DEBUG("Scheduler: Worker %d is advancing the tag.", worker_number); - // Advance the tag. - // Set a flag in the scheduler that the lock is held by the sole executing thread. - // This prevents acquiring the mutex in lf_scheduler_trigger_reaction. - scheduler->custom_data->solo_holds_mutex = true; - if (_lf_sched_advance_tag_locked(scheduler)) { - LF_PRINT_DEBUG("Scheduler: Reached stop tag."); - scheduler->should_stop = true; - scheduler->custom_data->solo_holds_mutex = false; - // Notify all threads that the stop tag has been reached. - LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed); + if (advance_tag(scheduler)) { + // Stop tag has been reached. break; } - scheduler->custom_data->solo_holds_mutex = false; - // Reset the level to 0. - scheduler->custom_data->current_level = 0; -#ifdef FEDERATED - // In case there are blocking network input reactions at this level, stall. - lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); -#endif } else { // Some other workers are still working on reactions on the current level. // Wait for them to finish. - wait_for_other_workers_to_finish(scheduler, worker_number); + wait_for_reaction_queue_updates(scheduler, worker_number); } } } From 26924333d09b2b8d7505ff42af15fd18a085312d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Tue, 28 May 2024 11:15:26 +0200 Subject: [PATCH 35/36] Comment only --- core/threaded/scheduler_GEDF_NP.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index c322b4e2e..65a8e4b50 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -178,7 +178,11 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu reaction_t* next_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q); if (next_reaction != NULL && LF_LEVEL(next_reaction->index) == scheduler->custom_data->current_level && scheduler->number_of_idle_workers > 0) { - // Notify an idle thread. + // Notify an idle thread. Note that we could do a broadcast here, but it's probably not + // a good idea because all workers awakened need to acquire the same mutex to examine the + // reaction queue. Only one of them will acquire the mutex, and that worker can check whether + // there are further reactions on the same level that warrant waking another worker thread. + // So we opt to wake one other worker here rather than broadcasting. LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); } LF_MUTEX_UNLOCK(&scheduler->env->mutex); From d577ea99657ed1b78169b13f1c5c038e41257931 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Tue, 28 May 2024 18:35:33 +0200 Subject: [PATCH 36/36] Format --- core/threaded/scheduler_GEDF_NP.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 65a8e4b50..e77257209 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -90,7 +90,7 @@ static int advance_tag(lf_scheduler_t* scheduler) { return 0; } -/** +/** * @brief Assuming all other workers are idle, advance to the next level. * @param scheduler The scheduler. */