Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle messages arriving during initial STA wait #316

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ void _lf_initialize_start_tag(environment_t *env) {
// Add reactions invoked at tag (0,0) (including startup reactions) to the reaction queue
_lf_trigger_startup_reactions(env);

#ifdef FEDERATED
#if defined FEDERATED
// If env is the environment for the top-level enclave, then initialize the federate.
environment_t *top_level_env;
_lf_get_environments(&top_level_env);
Expand All @@ -703,24 +703,21 @@ void _lf_initialize_start_tag(environment_t *env) {
// Get a start_time from the RTI
synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI.
}

// The start time will likely have changed. Adjust the current tag and stop tag.
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};
if (duration >= 0LL) {
// A duration has been specified. Recalculate the stop time.
env->stop_tag = ((tag_t) {.time = start_time + duration, .microstep = 0});
}
#endif

_lf_initialize_timers(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}
// If we have a non-zero STA offset, then we need to allow messages to arrive
// prior to the start time. To avoid spurious STP violations, we temporarily
// set the current time back by the STA offset.
env->current_tag = (tag_t){.time = start_time - _lf_fed_STA_offset, .microstep = 0u};

#if defined FEDERATED
// Call wait_until if federated. This is required because the startup procedure
// in synchronize_with_other_federates() can decide on a new start_time that is
// larger than the current physical time.
Expand All @@ -739,15 +736,26 @@ void _lf_initialize_start_tag(environment_t *env) {
// Here we wait until the start time and also release the environment mutex.
// this means that the other worker threads will be allowed to start. We need
// this to avoid potential deadlock in federated startup.
while(!wait_until(env, start_time, &env->event_q_changed)) {};
LF_PRINT_DEBUG("Done waiting for start time " PRINTF_TIME ".", start_time);
LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be small.",
// NOTE: wait_until automatically adds _lf_fed_STA_offset.
while(!wait_until(env, start_time + _lf_fed_STA_offset, &env->event_q_changed)) {};
LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + _lf_fed_STA_offset);
LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME
". This should be close to the STA offset.",
lf_time_physical() - start_time);

// Each federate executes the start tag (which is the current
// tag). Inform the RTI of this if needed.
send_next_event_tag(env, env->current_tag, true);
#endif
// Restore the current tag to match the start time.
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

// For messages that may have arrived while we were waiting, put
// reactions on the reaction queue.
_lf_pop_events(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}

#ifdef FEDERATED_DECENTRALIZED
// In federated execution (at least under decentralized coordination),
Expand All @@ -759,7 +767,24 @@ void _lf_initialize_start_tag(environment_t *env) {
// to be removed, if appropriate before proceeding to executing tag (0,0).
_lf_wait_on_tag_barrier(env, (tag_t){.time=start_time,.microstep=0});
spawn_staa_thread();
#endif // FEDERATED_DECENTRALIZED

// Pull from the event queue any messages that have been received during waiting.

#else // NOT FEDERATED_DECENTRALIZED
// Each federate executes the start tag (which is the current
// tag). Inform the RTI of this if needed.
send_next_event_tag(env, env->current_tag, true);
#endif // NOT FEDERATED_DECENTRALIZED
#else // NOT FEDERATED
_lf_initialize_timers(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}
#endif // NOT FEDERATED

// Set the following boolean so that other thread(s), including federated threads,
// know that the execution has started
Expand Down
Loading