Skip to content

Commit

Permalink
Merge pull request #2394 from lf-lang/dataflow
Browse files Browse the repository at this point in the history
Improvements in decentralized coordination
  • Loading branch information
edwardalee authored Aug 8, 2024
2 parents 4341e39 + 9de247b commit 058e1ec
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 69 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/lflang/LinguaFranca.xtext
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ Watchdog:
code=Code;

STP:
'STP' '(' value=Expression ')' code=Code;
('STP' | 'STAA') '(' value=Expression ')' code=Code;

Preamble:
(visibility=Visibility)? 'preamble' code=Code;
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/java/org/lflang/ModelInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.lflang.lf.Parameter;
import org.lflang.lf.ParameterReference;
import org.lflang.lf.Reactor;
import org.lflang.lf.STP;
import org.lflang.target.Target;
import org.lflang.util.FileUtil;

Expand Down Expand Up @@ -78,9 +77,6 @@ public class ModelInfo {
/** The set of deadlines that use a too-large constant to specify their time interval. */
public Set<Deadline> overflowingDeadlines;

/** The set of STP offsets that use a too-large constant to specify their time interval. */
public Set<STP> overflowingSTP;

/**
* The set of parameters used to specify a deadline while having been assigned a default value the
* is too large for this purpose. These parameters are to be reported during validation.
Expand Down Expand Up @@ -171,7 +167,6 @@ private void collectOverflowingNodes() {
this.overflowingAssignments = new HashSet<>();
this.overflowingDeadlines = new HashSet<>();
this.overflowingParameters = new HashSet<>();
this.overflowingSTP = new HashSet<>();

// Visit all deadlines in the model; detect possible overflow.
for (var deadline : filter(toIterable(model.eAllContents()), Deadline.class)) {
Expand All @@ -188,13 +183,6 @@ && detectOverflow(
this.overflowingDeadlines.add(deadline);
}
}
// Visit all STP offsets in the model; detect possible overflow.
for (var stp : filter(toIterable(model.eAllContents()), STP.class)) {
// If the time value overflows, mark this deadline as overflowing.
if (isTooLarge(ASTUtils.getLiteralTimeValue(stp.getValue()))) {
this.overflowingSTP.add(stp);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,9 +684,7 @@ private String generateCodeToInitializeFederate(
"// Initialize the socket mutexes",
"lf_mutex_init(&lf_outbound_socket_mutex);",
"lf_mutex_init(&socket_mutex);",
"lf_cond_init(&lf_port_status_changed, &env->mutex);",
CExtensionUtils.surroundWithIfFederatedDecentralized(
"lf_cond_init(&lf_current_tag_changed, &env->mutex);")));
"lf_cond_init(&lf_port_status_changed, &env->mutex);"));

// Find the STA (A.K.A. the global STP offset) for this federate.
if (federate.targetConfig.get(CoordinationProperty.INSTANCE)
Expand All @@ -696,7 +694,8 @@ private String generateCodeToInitializeFederate(
reactor.getParameters().stream()
.filter(
param ->
param.getName().equalsIgnoreCase("STP_offset")
(param.getName().equalsIgnoreCase("STP_offset")
|| param.getName().equalsIgnoreCase("STA"))
&& (param.getType() == null || param.getType().isTime()))
.findFirst();

Expand All @@ -708,7 +707,7 @@ private String generateCodeToInitializeFederate(
"lf_set_stp_offset(" + CTypes.getInstance().getTargetTimeExpr(globalSTPTV) + ");");
else if (globalSTP instanceof CodeExprImpl)
code.pr("lf_set_stp_offset(" + ((CodeExprImpl) globalSTP).getCode().getBody() + ");");
else messageReporter.at(stpParam.get().eContainer()).error("Invalid STP offset");
else messageReporter.at(stpParam.get().eContainer()).error("Invalid STA offset");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static String initializeTriggersForNetworkActions(
}

/**
* Generate C code that holds a sorted list of STP structs by time.
* Generate C code that holds a sorted list of STAA structs by time.
*
* <p>For decentralized execution, on every logical timestep, a thread will iterate through each
* staa struct, wait for the designated offset time, and set the associated port status to absent
Expand All @@ -107,7 +107,7 @@ public static String stpStructs(FederateInstance federate) {
for (int i = 0; i < federate.staaOffsets.size(); ++i) {
// Find the corresponding ActionInstance.
List<Action> networkActions =
federate.stpToNetworkActionMap.get(federate.staaOffsets.get(i));
federate.staToNetworkActionMap.get(federate.staaOffsets.get(i));

code.pr("staa_lst[" + i + "] = (staa_t*) malloc(sizeof(staa_t));");
code.pr(
Expand Down
23 changes: 12 additions & 11 deletions core/src/main/java/org/lflang/federated/generator/FedASTUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,30 +290,31 @@ private static void addNetworkReceiverReactor(
connection.dstFederate.zeroDelayCycleNetworkMessageActions.add(networkAction);

// Get the largest STAA for any reaction triggered by the destination port.
TimeValue maxSTP = findMaxSTP(connection, coordination);
TimeValue maxSTAA = findMaxSTAA(connection, coordination);

// Adjust this down by the delay on the connection, but do not go below zero.
TimeValue adjusted = maxSTP;
TimeValue adjusted = maxSTAA;
TimeValue delay = ASTUtils.getLiteralTimeValue(connection.getDefinition().getDelay());
if (delay != null) {
adjusted = maxSTP.subtract(delay);
adjusted = maxSTAA.subtract(delay);
}

if (!connection.dstFederate.currentSTPOffsets.contains(adjusted.time)) {
connection.dstFederate.currentSTPOffsets.add(adjusted.time);
// Need to include even zero STAAs so that ports can be assumed absent right away.
// Consolodate all equal STAAs.
if (!connection.dstFederate.currentSTAOffsets.contains(adjusted.time)) {
connection.dstFederate.currentSTAOffsets.add(adjusted.time);
connection.dstFederate.staaOffsets.add(adjusted);
connection.dstFederate.stpToNetworkActionMap.put(adjusted, new ArrayList<>());
connection.dstFederate.staToNetworkActionMap.put(adjusted, new ArrayList<>());
} else {
// TODO: Find more efficient way to reuse timevalues
for (var offset : connection.dstFederate.staaOffsets) {
if (maxSTP.time == offset.time) {
maxSTP = offset;
if (maxSTAA.time == offset.time) {
maxSTAA = offset;
break;
}
}
}

connection.dstFederate.stpToNetworkActionMap.get(adjusted).add(networkAction);
connection.dstFederate.staToNetworkActionMap.get(adjusted).add(networkAction);

// Add the action definition to the parent reactor.
receiver.getActions().add(networkAction);
Expand Down Expand Up @@ -521,7 +522,7 @@ private static void followReactionUpstream(
* @param coordination The coordination scheme.
* @return The maximum STP as a TimeValue
*/
private static TimeValue findMaxSTP(
private static TimeValue findMaxSTAA(
FedConnectionInstance connection, CoordinationMode coordination) {
Variable port = connection.getDestinationPortInstance().getDefinition();
FederateInstance instance = connection.dstFederate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ public Instantiation getInstantiation() {
/** Keep a unique list of enabled serializers */
public List<TimeValue> staaOffsets = new ArrayList<>();

/** The STP offsets that have been recorded in {@code stpOffsets thus far. */
public Set<Long> currentSTPOffsets = new HashSet<>();
/** The STA offsets that have been recorded thus far. */
public Set<Long> currentSTAOffsets = new HashSet<>();

/** Keep a map of STP values to a list of network actions */
public HashMap<TimeValue, List<Action>> stpToNetworkActionMap = new HashMap<>();
public HashMap<TimeValue, List<Action>> staToNetworkActionMap = new HashMap<>();

/** Keep a map of network actions to their associated instantiations */
public HashMap<Action, Instantiation> networkActionToInstantiation = new HashMap<>();
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/lflang/validation/LFValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import org.lflang.lf.Reaction;
import org.lflang.lf.Reactor;
import org.lflang.lf.ReactorDecl;
import org.lflang.lf.STP;
import org.lflang.lf.Serializer;
import org.lflang.lf.StateVar;
import org.lflang.lf.TargetDecl;
Expand Down Expand Up @@ -1036,15 +1035,6 @@ public void checkState(StateVar stateVar) {
}
}

@Check(CheckType.FAST)
public void checkSTPOffset(STP stp) {
if (isCBasedTarget() && this.info.overflowingSTP.contains(stp)) {
error(
"STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.",
Literals.STP__VALUE);
}
}

@Check(CheckType.FAST)
public void checkTargetDecl(TargetDecl target) throws IOException {
Optional<Target> targetOpt = Target.forName(target.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,34 +1904,21 @@ public void testCppMutableInput() throws Exception {
+ "In C++, any value can be made mutable by calling get_mutable_copy().");
}

@Test
public void testOverflowingSTP() throws Exception {
String testCase =
"""
target C;
main reactor {
reaction(startup) {==} STP(2147483648) {==}
}
""";

// TODO: Uncomment and fix failing test. See issue #903 on Github.
// validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getSTP(), null,
// "STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
}

@Test
public void testOverflowingDeadline() throws Exception {
String testCase =
"""
target C;
main reactor {
reaction(startup) {==} STP(2147483648) {==}
reaction(startup) {==} deadline (1 week) {==}
}
""";

// TODO: Uncomment and fix failing test. See issue #903 on Github.
// validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getDeadline(), null,
// "Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
validator.assertError(
parseWithoutError(testCase),
LfPackage.eINSTANCE.getDeadline(),
null,
"Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.");
}

@Test
Expand Down
64 changes: 64 additions & 0 deletions test/C/src/federated/Dataflow.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
target C {
coordination: decentralized // logging: debug
}

reactor Client(STP_offset: time = 1 day) {
input server_message: int
output client_message: int

reaction(startup) {=
lf_print("Client Startup!");
=}

reaction(server_message) -> client_message {=
int val = server_message->value + 1;
lf_sleep(MSEC(100));
lf_print("client: %d", val);
if (val == 9) {
lf_print("client requesting stop");
lf_request_stop();
}
lf_set(client_message, val);
=} STP(0) {=
// Zero STAA because STA is large and gets added.
lf_print_error_and_exit("Client STP Violated!");
=}
}

reactor Server(STP_offset: time = 1 day) {
output server_message: int
input client_message1: int
input client_message2: int

reaction(startup) -> server_message {=
lf_print("Server Startup!");
lf_set(server_message, 0);
=}

reaction(client_message1, client_message2) -> server_message {=
int val = client_message1->value;
if (val < client_message2->value) val = client_message2->value;
lf_sleep(MSEC(100));
val += 1;
lf_print("server: %d", val);
if (val == 8) {
lf_print("server requesting stop");
lf_set(server_message, val);
lf_request_stop();
}
lf_set(server_message, val);
=} STP(0) {=
// Zero STAA because STA is large and gets added.
lf_print_error_and_exit("Server STP Violated!");
=}
}

federated reactor {
client1 = new Client()
client2 = new Client()
server = new Server()
server.server_message -> client1.server_message
client1.client_message -> server.client_message1 after 0
server.server_message -> client2.server_message
client2.client_message -> server.client_message2 after 0
}
68 changes: 68 additions & 0 deletions test/C/src/federated/DecentralizedLagging.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* @brief A federated system with a decentralized coordinator that has large STA and STAA offsets.
*
* This test verifies that a large STA and STAA offset can be used when the data flow is predictable
* even if the the program is lagging behind physical time.
*
* @author Edward A. Lee
*/
target C {
coordination: decentralized,
timeout: 40 ns
}

// Pi needs an STP_offset because its event queue is usually empty and otherwise it will advance to the stop time.
reactor Pi(STP_offset: time = 1 day) {
input trigger: bool
output out: int

reaction(trigger) -> out {=
tag_t now = lf_tag();
lf_print("***** at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
lf_set(out, 42);
=} STP(30 s) {=
tag_t now = lf_tag();
lf_print_error_and_exit("STP violation at Pi at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
=}
}

// Gather doesn't need an STP_offset because its event queue is never empty.
reactor Gather {
input[4] in: int
output next: bool
logical action a(10 ns)
state count: int = 0

reaction(startup, a) -> next {=
lf_set(next, true);
=}

reaction(in) -> a {=
tag_t now = lf_tag();
for (int i = 0; i < 4; i++) {
if (!in[i]->is_present) {
lf_print_error_and_exit("Missing input %d in Gather at tag " PRINTF_TAG,
i, now.time - lf_time_start(), now.microstep);
}
}
lf_print("%d: at tag " PRINTF_TAG, self->count, now.time - lf_time_start(), now.microstep);
self->count++;
lf_schedule(a, 0);
=} STP(1 day) {=
tag_t now = lf_tag();
lf_print_error_and_exit("STP violation at Gather at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep);
=}

reaction(shutdown) {=
if (self->count < 5) {
lf_print_error_and_exit("Gather received only %d inputs. Expected 5.", self->count);
}
=}
}

federated reactor {
pi = new[4] Pi()
g = new Gather()
pi.out -> g.in
(g.next)+ -> pi.trigger
}
Loading

0 comments on commit 058e1ec

Please sign in to comment.