Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero-delay cycles #2366

Open
wants to merge 22 commits into
base: transient-fed
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions core/src/main/java/org/lflang/federated/extensions/CExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,33 @@ protected String makePreamble(
// that handles incoming network messages destined to the specified
// port. This will only be used if there are federates.
int numOfNetworkActions = federate.networkMessageActions.size();
int numZDCNetworkActions = federate.zeroDelayCycleNetworkMessageActions.size();
code.pr(
"""
interval_t _lf_action_delay_table[%1$s];
lf_action_base_t* _lf_action_table[%1$s];
size_t _lf_action_table_size = %1$s;
lf_action_base_t* _lf_zero_delay_cycle_action_table[%2$s];
size_t _lf_zero_delay_cycle_action_table_size = %2$s;
"""
.formatted(numOfNetworkActions, federate.zeroDelayCycleNetworkMessageActions.size()));
.formatted(numOfNetworkActions));
if (numZDCNetworkActions > 0) {
code.pr(
"""
lf_action_base_t* _lf_zero_delay_cycle_action_table[%1$s];
size_t _lf_zero_delay_cycle_action_table_size = %1$s;
uint16_t _lf_zero_delay_cycle_upstream_ids[%1$s];
bool _lf_zero_delay_cycle_upstream_disconnected[%1$s] = { false };
"""
.formatted(numZDCNetworkActions));
} else {
// Make sure these symbols are defined, even though only size will be used.
code.pr(
"""
lf_action_base_t** _lf_zero_delay_cycle_action_table = NULL;
size_t _lf_zero_delay_cycle_action_table_size = 0;
uint16_t* _lf_zero_delay_cycle_upstream_ids = NULL;
bool* _lf_zero_delay_cycle_upstream_disconnected = NULL;
""");
}

int numOfNetworkReactions = federate.networkReceiverReactions.size();
code.pr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,23 @@
public class CExtensionUtils {

// Regular expression pattern for shared_ptr types.
static final Pattern sharedPointerVariable = Pattern
.compile("^(/\\*.*?\\*/)?std::shared_ptr<(?<type>((/\\*.*?\\*/)?(\\S+))+)>$");
static final Pattern sharedPointerVariable =
Pattern.compile("^(/\\*.*?\\*/)?std::shared_ptr<(?<type>((/\\*.*?\\*/)?(\\S+))+)>$");

/**
* Generate C code that initializes network actions.
*
* <p>
* These network actions will be triggered by federate.c whenever a message is
* received from
* <p>These network actions will be triggered by federate.c whenever a message is received from
* the network.
*
* @param federate The federate.
* @param main The main reactor that contains the federate (used to lookup
* references).
* @param main The main reactor that contains the federate (used to lookup references).
*/
public static String initializeTriggersForNetworkActions(
FederateInstance federate, ReactorInstance main) {
CodeBuilder code = new CodeBuilder();
if (!federate.networkMessageActions.isEmpty()) {
var actionTableCount = 0;
var zeroDelayActionTableCount = 0;
for (int i = 0; i < federate.networkMessageActions.size(); ++i) {
// Find the corresponding ActionInstance.
Action action = federate.networkMessageActions.get(i);
Expand All @@ -76,10 +72,17 @@ public static String initializeTriggersForNetworkActions(
+ "] = (lf_action_base_t*)&"
+ trigger
+ "; \\");
if (federate.zeroDelayCycleNetworkMessageActions.contains(action)) {
int j = federate.zeroDelayCycleNetworkMessageActions.indexOf(action);
if (j >= 0) {
var upstream = federate.zeroDelayCycleNetworkUpstreamFeds.get(j);
code.pr("_lf_zero_delay_cycle_upstream_ids[" + j + "] = " + upstream.id + "; \\");
if (upstream.isTransient) {
// Transient federates are assumed to be initially disconnected.
code.pr("_lf_zero_delay_cycle_upstream_disconnected[" + j + "] = true; \\");
}
code.pr(
"_lf_zero_delay_cycle_action_table["
+ zeroDelayActionTableCount++
+ j
+ "] = (lf_action_base_t*)&"
+ trigger
+ "; \\");
Expand All @@ -92,11 +95,8 @@ public static String initializeTriggersForNetworkActions(
/**
* Generate C code that holds a sorted list of STAA structs by time.
*
* <p>
* For decentralized execution, on every logical timestep, a thread will iterate
* through each
* staa struct, wait for the designated offset time, and set the associated port
* status to absent
* <p>For decentralized execution, on every logical timestep, a thread will iterate through each
* staa struct, wait for the designated offset time, and set the associated port status to absent
* if it isn't known.
*
* @param federate The federate.
Expand All @@ -112,7 +112,8 @@ public static String stpStructs(FederateInstance federate) {
// main reactor for each Action.
for (int i = 0; i < federate.staaOffsets.size(); ++i) {
// Find the corresponding ActionInstance.
List<Action> networkActions = federate.staToNetworkActionMap.get(federate.staaOffsets.get(i));
List<Action> networkActions =
federate.staToNetworkActionMap.get(federate.staaOffsets.get(i));

code.pr("staa_lst[" + i + "] = (staa_t*) malloc(sizeof(staa_t));");
code.pr(
Expand Down Expand Up @@ -145,8 +146,7 @@ public static String stpStructs(FederateInstance federate) {
}

/**
* Create a port status field variable for a network input port "input" in the
* self struct of a
* Create a port status field variable for a network input port "input" in the self struct of a
* reactor.
*
* @param input The network input port
Expand All @@ -167,23 +167,15 @@ public static String createPortStatusFieldForInput(Input input) {
}

/**
* Given a connection 'delay' expression, return a string that represents the
* interval_t value of
* Given a connection 'delay' expression, return a string that represents the interval_t value of
* the additional delay that needs to be applied to the outgoing message.
*
* <p>
* The returned additional delay in absence of after on network connection
* (i.e., if delay is
* passed as a null) is NEVER. This has a special meaning in C library functions
* that send network
* messages that carry timestamps (@see lf_send_tagged_message and
* lf_send_port_absent_to_federate
* in lib/core/federate.c). In this case, the sender will send its current tag
* as the timestamp of
* the outgoing message without adding a microstep delay. If the user has
* assigned an after delay
* to the network connection (that can be zero) either as a time value (e.g.,
* 200 msec) or as a
* <p>The returned additional delay in absence of after on network connection (i.e., if delay is
* passed as a null) is NEVER. This has a special meaning in C library functions that send network
* messages that carry timestamps (@see lf_send_tagged_message and lf_send_port_absent_to_federate
* in lib/core/federate.c). In this case, the sender will send its current tag as the timestamp of
* the outgoing message without adding a microstep delay. If the user has assigned an after delay
* to the network connection (that can be zero) either as a time value (e.g., 200 msec) or as a
* literal (e.g., a parameter), that delay in nsec will be returned.
*
* @param delay The delay associated with a connection.
Expand Down Expand Up @@ -227,9 +219,11 @@ public static void handleCompileDefinitions(
}

private static void handleAdvanceMessageInterval(FederateInstance federate) {
var advanceMessageInterval = federate.targetConfig.get(CoordinationOptionsProperty.INSTANCE).advanceMessageInterval;
var advanceMessageInterval =
federate.targetConfig.get(CoordinationOptionsProperty.INSTANCE).advanceMessageInterval;
if (advanceMessageInterval != null) {
federate.targetConfig
federate
.targetConfig
.get(CompileDefinitionsProperty.INSTANCE)
.put("ADVANCE_MESSAGE_INTERVAL", String.valueOf(advanceMessageInterval.toNanoSeconds()));
}
Expand All @@ -242,15 +236,12 @@ static boolean clockSyncIsOn(FederateInstance federate, RtiConfig rtiConfig) {
}

/**
* Initialize clock synchronization (if enabled) and its related options for a
* given federate.
* Initialize clock synchronization (if enabled) and its related options for a given federate.
*
* <p>
* Clock synchronization can be enabled using the clock-sync target property.
* <p>Clock synchronization can be enabled using the clock-sync target property.
*
* @see <a
* href=
* "https://github.com/icyphy/lingua-franca/wiki/Distributed-Execution#clock-synchronization">Documentation</a>
* @see <a href=
* "https://github.com/icyphy/lingua-franca/wiki/Distributed-Execution#clock-synchronization">Documentation</a>
*/
public static void initializeClockSynchronization(
FederateInstance federate, RtiConfig rtiConfig, MessageReporter messageReporter) {
Expand All @@ -277,15 +268,12 @@ public static void initializeClockSynchronization(
}

/**
* Initialize clock synchronization (if enabled) and its related options for a
* given federate.
* Initialize clock synchronization (if enabled) and its related options for a given federate.
*
* <p>
* Clock synchronization can be enabled using the clock-sync target property.
* <p>Clock synchronization can be enabled using the clock-sync target property.
*
* @see <a
* href=
* "https://github.com/icyphy/lingua-franca/wiki/Distributed-Execution#clock-synchronization">Documentation</a>
* @see <a href=
* "https://github.com/icyphy/lingua-franca/wiki/Distributed-Execution#clock-synchronization">Documentation</a>
*/
public static void addClockSyncCompileDefinitions(FederateInstance federate) {

Expand Down Expand Up @@ -316,9 +304,10 @@ public static void generateCMakeInclude(
FederateInstance federate, FederationFileConfig fileConfig) throws IOException {
Files.createDirectories(fileConfig.getSrcPath().resolve("include"));

Path cmakeIncludePath = fileConfig
.getSrcPath()
.resolve("include" + File.separator + federate.name + "_extension.cmake");
Path cmakeIncludePath =
fileConfig
.getSrcPath()
.resolve("include" + File.separator + federate.name + "_extension.cmake");

CodeBuilder cmakeIncludeCode = new CodeBuilder();

Expand All @@ -329,8 +318,6 @@ public static void generateCMakeInclude(
"add_compile_definitions(LF_PACKAGE_DIRECTORY=\"" + fileConfig.srcPkgPath + "\")");
cmakeIncludeCode.pr(
"add_compile_definitions(LF_SOURCE_GEN_DIRECTORY=\"" + fileConfig.getSrcGenPath() + "\")");
cmakeIncludeCode.pr(
"add_compile_definitions(LF_FEDERATES_BIN_DIRECTORY=\"" + fileConfig.getFedBinPath() + "\")");
try (var srcWriter = Files.newBufferedWriter(cmakeIncludePath)) {
srcWriter.write(cmakeIncludeCode.getCode());
}
Expand All @@ -341,8 +328,7 @@ public static void generateCMakeInclude(
}

/**
* Generate code that sends the neighbor structure message to the RTI. See
* {@code
* Generate code that sends the neighbor structure message to the RTI. See {@code
* MSG_TYPE_NEIGHBOR_STRUCTURE} in {@code federated/net_common.h}.
*
* @param federate The federate that is sending its neighbor structure
Expand Down Expand Up @@ -415,13 +401,14 @@ public static String generateFederateNeighborStructure(FederateInstance federate
// Use NEVER to encode no delay at all.
code.pr("candidate_tmp = NEVER;");
} else {
var delayTime = delay instanceof ParameterReference
// In that case use the default value.
? CTypes.getInstance()
.getTargetTimeExpr(
ASTUtils.getDefaultAsTimeValue(
((ParameterReference) delay).getParameter()))
: CTypes.getInstance().getTargetExpr(delay, InferredType.time());
var delayTime =
delay instanceof ParameterReference
// In that case use the default value.
? CTypes.getInstance()
.getTargetTimeExpr(
ASTUtils.getDefaultAsTimeValue(
((ParameterReference) delay).getParameter()))
: CTypes.getInstance().getTargetExpr(delay, InferredType.time());

code.pr(
String.join(
Expand Down Expand Up @@ -478,27 +465,26 @@ public static String surroundWithIfElseFederated(String insideIf, String insideE
return surroundWithIfFederated(insideIf);
} else {
return """
#ifdef FEDERATED
%s
#else
%s
#endif // FEDERATED
"""
#ifdef FEDERATED
%s
#else
%s
#endif // FEDERATED
"""
.formatted(insideIf, insideElse);
}
}

/**
* Surround {@code code} with blocks to ensure that code only executes if the
* program is
* Surround {@code code} with blocks to ensure that code only executes if the program is
* federated.
*/
public static String surroundWithIfFederated(String code) {
return """
#ifdef FEDERATED
%s
#endif // FEDERATED
"""
#ifdef FEDERATED
%s
#endif // FEDERATED
"""
.formatted(code);
}

Expand All @@ -507,41 +493,39 @@ public static String surroundWithIfElseFederatedCentralized(String insideIf, Str
return surroundWithIfFederatedCentralized(insideIf);
} else {
return """
#ifdef FEDERATED_CENTRALIZED
%s
#else
%s
#endif // FEDERATED_CENTRALIZED
"""
#ifdef FEDERATED_CENTRALIZED
%s
#else
%s
#endif // FEDERATED_CENTRALIZED
"""
.formatted(insideIf, insideElse);
}
}

/**
* Surround {@code code} with blocks to ensure that code only executes if the
* program is federated
* Surround {@code code} with blocks to ensure that code only executes if the program is federated
* and has a centralized coordination.
*/
public static String surroundWithIfFederatedCentralized(String code) {
return """
#ifdef FEDERATED_CENTRALIZED
%s
#endif // FEDERATED_CENTRALIZED
"""
#ifdef FEDERATED_CENTRALIZED
%s
#endif // FEDERATED_CENTRALIZED
"""
.formatted(code);
}

/**
* Surround {@code code} with blocks to ensure that code only executes if the
* program is federated
* Surround {@code code} with blocks to ensure that code only executes if the program is federated
* and has a decentralized coordination.
*/
public static String surroundWithIfFederatedDecentralized(String code) {
return """
#ifdef FEDERATED_DECENTRALIZED
%s
#endif // FEDERATED_DECENTRALIZED
"""
#ifdef FEDERATED_DECENTRALIZED
%s
#endif // FEDERATED_DECENTRALIZED
"""
.formatted(code);
}

Expand All @@ -562,9 +546,7 @@ public static String generateSerializationIncludes(FederateInstance federate) {
return code.getCode();
}

/**
* Generate cmake-include code needed for enabled serializers of the federate.
*/
/** Generate cmake-include code needed for enabled serializers of the federate. */
public static String generateSerializationCMakeExtension(FederateInstance federate) {
CodeBuilder code = new CodeBuilder();
for (SupportedSerializers serializer : federate.enabledSerializers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ private static void addNetworkReceiverReactor(
connection.dstFederate.networkMessageActions.add(networkAction);
connection.dstFederate.networkMessageActionDelays.add(connection.getDefinition().getDelay());
if (connection.srcFederate.isInZeroDelayCycle()
&& connection.getDefinition().getDelay() == null)
&& connection.getDefinition().getDelay() == null) {
connection.dstFederate.zeroDelayCycleNetworkMessageActions.add(networkAction);

connection.dstFederate.zeroDelayCycleNetworkUpstreamFeds.add(connection.srcFederate);
}
// Get the largest STAA for any reaction triggered by the destination port.
TimeValue maxSTAA = findMaxSTAA(connection, coordination);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public Instantiation getInstantiation() {
*/
public List<Action> zeroDelayCycleNetworkMessageActions = new ArrayList<>();

/**
* List of upstream federates corresponding to actions in the zeroDelayCycleNetworkMessageActions
* list.
*/
public List<FederateInstance> zeroDelayCycleNetworkUpstreamFeds = new ArrayList<>();

/**
* A set of federates with which this federate has an inbound connection There will only be one
* physical connection even if federate A has defined multiple physical connections to federate B.
Expand Down
Loading
Loading