Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements in decentralized coordination #2394

Merged
merged 27 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cbc2970
Added tests for dataflow branch of reactor-c
edwardalee Aug 3, 2024
d1a1c2b
Format and align reactor-c
edwardalee Aug 3, 2024
e0f5943
Merge branch 'master' into dataflow
edwardalee Aug 3, 2024
c1ab112
Align reactor-c
edwardalee Aug 3, 2024
39389ad
Keep producing output after calling lf_stop
edwardalee Aug 3, 2024
362642f
Grammar fix
edwardalee Aug 3, 2024
afc6ad5
Allow code expressions for time values
edwardalee Aug 3, 2024
7e2621e
Added test similar to Python version
edwardalee Aug 3, 2024
5dd7b9e
Align reactor-c
edwardalee Aug 3, 2024
6c5c643
Set STAA to 0 and align reactor-c
edwardalee Aug 4, 2024
b328760
Align reactor-c
edwardalee Aug 4, 2024
ef085e6
Align reactor-c
edwardalee Aug 4, 2024
bf6c1c1
Formatter doesn't like my comment
edwardalee Aug 4, 2024
50c6e01
Rename STP_offset to STA and STP to STAA
edwardalee Aug 5, 2024
71560df
Format
edwardalee Aug 5, 2024
5d7b5c5
Revert to not allowing code expressions for time values
edwardalee Aug 5, 2024
82e5547
Merge branch 'master' into dataflow
edwardalee Aug 5, 2024
0da32ed
Remove codeblock for STA
edwardalee Aug 5, 2024
d0843ea
Removed unused condition variable
edwardalee Aug 5, 2024
acd7898
Do not include zero STAAs in waiting algorithm
edwardalee Aug 5, 2024
1dde3d3
Removed bogus test which was commented out anyway
edwardalee Aug 5, 2024
5311e9f
Remove bogus limiting of STP offset
edwardalee Aug 5, 2024
19a650f
Fix and uncomment test of overflowing deadline
edwardalee Aug 5, 2024
470e9d0
Revert exclusion of zero STAAs. They are needed.
edwardalee Aug 5, 2024
da8d16a
Spotless
edwardalee Aug 5, 2024
ce11a80
Added required STA
edwardalee Aug 6, 2024
9de247b
Spotless
edwardalee Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/org/lflang/LinguaFranca.xtext
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ Watchdog:
code=Code;

STP:
'STP' '(' value=Expression ')' code=Code;
('STP' | 'STAA') '(' value=Expression ')' code=Code;
lhstrh marked this conversation as resolved.
Show resolved Hide resolved

Preamble:
(visibility=Visibility)? 'preamble' code=Code;
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/java/org/lflang/ModelInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.lflang.lf.Parameter;
import org.lflang.lf.ParameterReference;
import org.lflang.lf.Reactor;
import org.lflang.lf.STP;
import org.lflang.target.Target;
import org.lflang.util.FileUtil;

Expand Down Expand Up @@ -78,9 +77,6 @@ public class ModelInfo {
/** The set of deadlines that use a too-large constant to specify their time interval. */
public Set<Deadline> overflowingDeadlines;

/** The set of STP offsets that use a too-large constant to specify their time interval. */
public Set<STP> overflowingSTP;

/**
* The set of parameters used to specify a deadline while having been assigned a default value the
* is too large for this purpose. These parameters are to be reported during validation.
Expand Down Expand Up @@ -171,7 +167,6 @@ private void collectOverflowingNodes() {
this.overflowingAssignments = new HashSet<>();
this.overflowingDeadlines = new HashSet<>();
this.overflowingParameters = new HashSet<>();
this.overflowingSTP = new HashSet<>();

// Visit all deadlines in the model; detect possible overflow.
for (var deadline : filter(toIterable(model.eAllContents()), Deadline.class)) {
Expand All @@ -188,13 +183,6 @@ && detectOverflow(
this.overflowingDeadlines.add(deadline);
}
}
// Visit all STP offsets in the model; detect possible overflow.
for (var stp : filter(toIterable(model.eAllContents()), STP.class)) {
// If the time value overflows, mark this deadline as overflowing.
if (isTooLarge(ASTUtils.getLiteralTimeValue(stp.getValue()))) {
this.overflowingSTP.add(stp);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,9 +684,7 @@ private String generateCodeToInitializeFederate(
"// Initialize the socket mutexes",
"lf_mutex_init(&lf_outbound_socket_mutex);",
"lf_mutex_init(&socket_mutex);",
"lf_cond_init(&lf_port_status_changed, &env->mutex);",
CExtensionUtils.surroundWithIfFederatedDecentralized(
"lf_cond_init(&lf_current_tag_changed, &env->mutex);")));
"lf_cond_init(&lf_port_status_changed, &env->mutex);"));

// Find the STA (A.K.A. the global STP offset) for this federate.
if (federate.targetConfig.get(CoordinationProperty.INSTANCE)
Expand All @@ -696,7 +694,8 @@ private String generateCodeToInitializeFederate(
reactor.getParameters().stream()
.filter(
param ->
param.getName().equalsIgnoreCase("STP_offset")
(param.getName().equalsIgnoreCase("STP_offset")
|| param.getName().equalsIgnoreCase("STA"))
&& (param.getType() == null || param.getType().isTime()))
.findFirst();

Expand All @@ -708,7 +707,7 @@ private String generateCodeToInitializeFederate(
"lf_set_stp_offset(" + CTypes.getInstance().getTargetTimeExpr(globalSTPTV) + ");");
else if (globalSTP instanceof CodeExprImpl)
code.pr("lf_set_stp_offset(" + ((CodeExprImpl) globalSTP).getCode().getBody() + ");");
else messageReporter.at(stpParam.get().eContainer()).error("Invalid STP offset");
else messageReporter.at(stpParam.get().eContainer()).error("Invalid STA offset");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static String initializeTriggersForNetworkActions(
}

/**
* Generate C code that holds a sorted list of STP structs by time.
* Generate C code that holds a sorted list of STAA structs by time.
*
* <p>For decentralized execution, on every logical timestep, a thread will iterate through each
* staa struct, wait for the designated offset time, and set the associated port status to absent
Expand All @@ -107,7 +107,7 @@ public static String stpStructs(FederateInstance federate) {
for (int i = 0; i < federate.staaOffsets.size(); ++i) {
// Find the corresponding ActionInstance.
List<Action> networkActions =
federate.stpToNetworkActionMap.get(federate.staaOffsets.get(i));
federate.staToNetworkActionMap.get(federate.staaOffsets.get(i));

code.pr("staa_lst[" + i + "] = (staa_t*) malloc(sizeof(staa_t));");
code.pr(
Expand Down
23 changes: 12 additions & 11 deletions core/src/main/java/org/lflang/federated/generator/FedASTUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,30 +290,31 @@ private static void addNetworkReceiverReactor(
connection.dstFederate.zeroDelayCycleNetworkMessageActions.add(networkAction);

// Get the largest STAA for any reaction triggered by the destination port.
TimeValue maxSTP = findMaxSTP(connection, coordination);
TimeValue maxSTAA = findMaxSTAA(connection, coordination);

// Adjust this down by the delay on the connection, but do not go below zero.
TimeValue adjusted = maxSTP;
TimeValue adjusted = maxSTAA;
TimeValue delay = ASTUtils.getLiteralTimeValue(connection.getDefinition().getDelay());
if (delay != null) {
adjusted = maxSTP.subtract(delay);
adjusted = maxSTAA.subtract(delay);
}

if (!connection.dstFederate.currentSTPOffsets.contains(adjusted.time)) {
connection.dstFederate.currentSTPOffsets.add(adjusted.time);
// Need to include even zero STAAs so that ports can be assumed absent right away.
// Consolodate all equal STAAs.
if (!connection.dstFederate.currentSTAOffsets.contains(adjusted.time)) {
connection.dstFederate.currentSTAOffsets.add(adjusted.time);
connection.dstFederate.staaOffsets.add(adjusted);
connection.dstFederate.stpToNetworkActionMap.put(adjusted, new ArrayList<>());
connection.dstFederate.staToNetworkActionMap.put(adjusted, new ArrayList<>());
} else {
// TODO: Find more efficient way to reuse timevalues
for (var offset : connection.dstFederate.staaOffsets) {
if (maxSTP.time == offset.time) {
maxSTP = offset;
if (maxSTAA.time == offset.time) {
maxSTAA = offset;
break;
}
}
}

connection.dstFederate.stpToNetworkActionMap.get(adjusted).add(networkAction);
connection.dstFederate.staToNetworkActionMap.get(adjusted).add(networkAction);

// Add the action definition to the parent reactor.
receiver.getActions().add(networkAction);
Expand Down Expand Up @@ -521,7 +522,7 @@ private static void followReactionUpstream(
* @param coordination The coordination scheme.
* @return The maximum STP as a TimeValue
*/
private static TimeValue findMaxSTP(
private static TimeValue findMaxSTAA(
FedConnectionInstance connection, CoordinationMode coordination) {
Variable port = connection.getDestinationPortInstance().getDefinition();
FederateInstance instance = connection.dstFederate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ public Instantiation getInstantiation() {
/** Keep a unique list of enabled serializers */
public List<TimeValue> staaOffsets = new ArrayList<>();

/** The STP offsets that have been recorded in {@code stpOffsets thus far. */
public Set<Long> currentSTPOffsets = new HashSet<>();
/** The STA offsets that have been recorded thus far. */
public Set<Long> currentSTAOffsets = new HashSet<>();

/** Keep a map of STP values to a list of network actions */
public HashMap<TimeValue, List<Action>> stpToNetworkActionMap = new HashMap<>();
public HashMap<TimeValue, List<Action>> staToNetworkActionMap = new HashMap<>();

/** Keep a map of network actions to their associated instantiations */
public HashMap<Action, Instantiation> networkActionToInstantiation = new HashMap<>();
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/lflang/validation/LFValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import org.lflang.lf.Reaction;
import org.lflang.lf.Reactor;
import org.lflang.lf.ReactorDecl;
import org.lflang.lf.STP;
import org.lflang.lf.Serializer;
import org.lflang.lf.StateVar;
import org.lflang.lf.TargetDecl;
Expand Down Expand Up @@ -1036,15 +1035,6 @@ public void checkState(StateVar stateVar) {
}
}

@Check(CheckType.FAST)
lhstrh marked this conversation as resolved.
Show resolved Hide resolved
public void checkSTPOffset(STP stp) {
if (isCBasedTarget() && this.info.overflowingSTP.contains(stp)) {
error(
"STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.",
Literals.STP__VALUE);
}
}

@Check(CheckType.FAST)
public void checkTargetDecl(TargetDecl target) throws IOException {
Optional<Target> targetOpt = Target.forName(target.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,34 +1904,21 @@ public void testCppMutableInput() throws Exception {
+ "In C++, any value can be made mutable by calling get_mutable_copy().");
}

@Test
public void testOverflowingSTP() throws Exception {
String testCase =
"""
target C;
main reactor {
reaction(startup) {==} STP(2147483648) {==}
}
""";

// TODO: Uncomment and fix failing test. See issue #903 on Github.
// validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getSTP(), null,
// "STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
}

@Test
public void testOverflowingDeadline() throws Exception {
String testCase =
"""
target C;
main reactor {
reaction(startup) {==} STP(2147483648) {==}
reaction(startup) {==} deadline (1 week) {==}
}
""";

// TODO: Uncomment and fix failing test. See issue #903 on Github.
// validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getDeadline(), null,
// "Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
validator.assertError(
parseWithoutError(testCase),
LfPackage.eINSTANCE.getDeadline(),
null,
"Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
}

@Test
Expand Down
64 changes: 64 additions & 0 deletions test/C/src/federated/Dataflow.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
target C {
coordination: decentralized // logging: debug
}

reactor Client(STP_offset: time = 1 day) {
input server_message: int
output client_message: int

reaction(startup) {=
lf_print("Client Startup!");
=}

reaction(server_message) -> client_message {=
int val = server_message->value + 1;
lf_sleep(MSEC(100));
lf_print("client: %d", val);
if (val == 9) {
lf_print("client requesting stop");
lf_request_stop();
}
lf_set(client_message, val);
=} STP(0) {=
// Zero STAA because STA is large and gets added.
lf_print_error_and_exit("Client STP Violated!");
=}
}

reactor Server(STP_offset: time = 1 day) {
output server_message: int
input client_message1: int
input client_message2: int

reaction(startup) -> server_message {=
lf_print("Server Startup!");
lf_set(server_message, 0);
=}

reaction(client_message1, client_message2) -> server_message {=
int val = client_message1->value;
if (val < client_message2->value) val = client_message2->value;
lf_sleep(MSEC(100));
val += 1;
lf_print("server: %d", val);
if (val == 8) {
lf_print("server requesting stop");
lf_set(server_message, val);
lf_request_stop();
}
lf_set(server_message, val);
=} STP(0) {=
// Zero STAA because STA is large and gets added.
lf_print_error_and_exit("Server STP Violated!");
=}
}

federated reactor {
client1 = new Client()
client2 = new Client()
server = new Server()
server.server_message -> client1.server_message
client1.client_message -> server.client_message1 after 0
server.server_message -> client2.server_message
client2.client_message -> server.client_message2 after 0
}
68 changes: 68 additions & 0 deletions test/C/src/federated/DecentralizedLagging.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* @brief A federated system with a decentralized coordinator that has large STA and STAA offsets.
*
* This test verifies that a large STA and STAA offset can be used when the data flow is predictable
* even if the the program is lagging behind physical time.
*
* @author Edward A. Lee
*/
target C {
coordination: decentralized,
timeout: 40 ns
}

// Pi needs an STP_offset because its event queue is usually empty and otherwise it will advance to the stop time.
reactor Pi(STP_offset: time = 1 day) {
input trigger: bool
output out: int

reaction(trigger) -> out {=
tag_t now = lf_tag();
lf_print("***** at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
lf_set(out, 42);
=} STP(30 s) {=
tag_t now = lf_tag();
lf_print_error_and_exit("STP violation at Pi at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
=}
}

// Gather doesn't need an STP_offset because its event queue is never empty.
reactor Gather {
input[4] in: int
output next: bool
logical action a(10 ns)
state count: int = 0

reaction(startup, a) -> next {=
lf_set(next, true);
=}

reaction(in) -> a {=
tag_t now = lf_tag();
for (int i = 0; i < 4; i++) {
if (!in[i]->is_present) {
lf_print_error_and_exit("Missing input %d in Gather at tag " PRINTF_TAG,
i, now.time - lf_time_start(), now.microstep);
}
}
lf_print("%d: at tag " PRINTF_TAG, self->count, now.time - lf_time_start(), now.microstep);
self->count++;
lf_schedule(a, 0);
=} STP(1 day) {=
tag_t now = lf_tag();
lf_print_error_and_exit("STP violation at Gather at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
=}

reaction(shutdown) {=
if (self->count < 5) {
lf_print_error_and_exit("Gather received only %d inputs. Expected 5.", self->count);
}
=}
}

federated reactor {
pi = new[4] Pi()
g = new Gather()
pi.out -> g.in
(g.next)+ -> pi.trigger
}
Loading
Loading