Skip to content

Commit

Permalink
Merge pull request #1925 from lf-lang/ts-cyclic-dependencies
Browse files Browse the repository at this point in the history
Handling cyclic dependencies for TypeScript federated execution
  • Loading branch information
hokeun authored Aug 25, 2023
2 parents 8505fb5 + b33ce99 commit 8bd0390
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public String generateNetworkSenderBody(
CoordinationType coordinationType,
MessageReporter messageReporter) {
return """
if (%1$s%2$s[sender_index as number] !== undefined) {
this.util.sendRTITimedMessage(%1$s%2$s[sender_index as number], %3$s, %4$s, %5$s);
if (%1$s%2$s[0] !== undefined) {
this.util.sendRTITimedMessage(%1$s%2$s[0], %3$s, %4$s, %5$s);
}
"""
.formatted(
Expand All @@ -136,7 +136,7 @@ public String generatePortAbsentReactionBody(
return """
// If the output port has not been set for the current logical time,
// send an ABSENT message to the receiving federate
if (%1$s%2$s === undefined) {
if (%1$s%2$s[0] === undefined) {
this.util.sendRTIPortAbsent(%3$d, %4$d, %5$s);
}
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ private static void addNetworkReceiverReactor(
.getParent()
.reactorDefinition; // Top-level reactor.

// Add the attribute "_NetworkReactor" for the network receiver.
// Add the attribute "_networkReactor" for the network receiver.
var a = factory.createAttribute();
a.setAttrName("_NetworkReactor");
a.setAttrName("_networkReactor");
var e = factory.createAttrParm();
e.setValue("Receiver");
e.setValue("\"receiver\"");
a.getAttrParms().add(e);
receiver.getAttributes().add(a);

Expand Down Expand Up @@ -646,11 +646,11 @@ private static Reactor getNetworkSenderReactor(
// Initialize Reactor and Reaction AST Nodes
Reactor sender = factory.createReactor();

// Add the attribute "_NetworkReactor" for the network sender.
// Add the attribute "_networkReactor" for the network sender.
var a = factory.createAttribute();
a.setAttrName("_NetworkReactor");
a.setAttrName("_networkReactor");
var e = factory.createAttrParm();
e.setValue("Sender");
e.setValue("\"sender\"");
a.getAttrParms().add(e);
sender.getAttributes().add(a);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ enum AttrParamType {
"_tpoLevel",
new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.INT, false))));
ATTRIBUTE_SPECS_BY_NAME.put(
"_NetworkReactor",
"_networkReactor",
new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.STRING, false))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TSConstructorGenerator(
private fun initializeParameter(p: Parameter): String =
"${p.name}: ${TSTypes.getInstance().getTargetType(p)} = ${TSTypes.getInstance().getTargetInitializer(p)}"

private fun generateConstructorArguments(reactor: Reactor): String {
private fun generateConstructorArguments(reactor: Reactor, isNetworkReactor: Boolean): String {
val arguments = StringJoiner(", \n")
if (reactor.isMain || reactor.isFederated) {
arguments.add("timeout: TimeValue | undefined = undefined")
Expand All @@ -46,10 +46,14 @@ class TSConstructorGenerator(
arguments.add("fail?: () => void")
}

if (isNetworkReactor) {
arguments.add("tpoLevel?: number")
}

return arguments.toString()
}

private fun generateSuperConstructorCall(reactor: Reactor, isFederate: Boolean): String =
private fun generateSuperConstructorCall(reactor: Reactor, isFederate: Boolean, isNetworkReactor: Boolean): String =
if (reactor.isMain) {
if (isFederate) {
"""
Expand All @@ -65,7 +69,11 @@ class TSConstructorGenerator(
"super(timeout, keepAlive, fast, success, fail);"
}
} else {
"super(parent);"
if (isNetworkReactor) {
"super(parent, tpoLevel)"
} else {
"super(parent);"
}
}

// Generate code for setting target configurations.
Expand All @@ -85,6 +93,7 @@ class TSConstructorGenerator(
actions: TSActionGenerator,
ports: TSPortGenerator,
isFederate: Boolean,
isNetworkReactor: Boolean,
isNetworkReceiver: Boolean
): String {
val connections = TSConnectionGenerator(reactor.connections, messageReporter)
Expand All @@ -93,9 +102,9 @@ class TSConstructorGenerator(
return with(PrependOperator) {
"""
|constructor (
${" | "..generateConstructorArguments(reactor)}
${" | "..generateConstructorArguments(reactor, isNetworkReactor)}
|) {
${" | "..generateSuperConstructorCall(reactor, isFederate)}
${" | "..generateSuperConstructorCall(reactor, isFederate, isNetworkReactor)}
${" | "..generateTargetConfigurations(targetConfig)}
${" | "..instances.generateInstantiations()}
${" | "..timers.generateInstantiations()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,20 @@ class TSInstanceGenerator(
val childReactorInstantiations = LinkedList<String>()
var portID = 0
for (childReactor in childReactors) {
var isNetworkSender = false
var isNetworkReceiver = false
val networkReactorAttribute = AttributeUtils.findAttributeByName(childReactor.reactorClass, "_NetworkReactor")
if (networkReactorAttribute != null) {
isNetworkSender = networkReactorAttribute.getAttrParms().get(0).getName() == "Sender"
isNetworkReceiver = networkReactorAttribute.getAttrParms().get(0).getName() == "Receiver"
}
var tpoLevel = AttributeUtils.getFirstArgumentValue(AttributeUtils.findAttributeByName(childReactor, "_tpoLevel"));
val networkReactorAttributeValue = AttributeUtils.getFirstArgumentValue(AttributeUtils.findAttributeByName(childReactor.reactorClass, "_networkReactor"))
var isNetworkReceiver = networkReactorAttributeValue == "receiver"
var isNetworkSender = networkReactorAttributeValue == "sender"

val childReactorArguments = StringJoiner(", ")
childReactorArguments.add("this")

for (parameter in childReactor.reactorClass.toDefinition().parameters) {
childReactorArguments.add(TSTypes.getInstance().getTargetInitializer(parameter, childReactor))
}
if (tpoLevel != null) {
childReactorArguments.add(tpoLevel.toString());
}
if (childReactor.isBank) {
childReactorInstantiations.add(
"this.${childReactor.name} = " +
Expand All @@ -77,7 +78,7 @@ class TSInstanceGenerator(
// Assume that network receiver reactors are sorted by portID
childReactorInstantiations.add(
"this.registerNetworkReceiver(\n"
+ "\t${portID},\n"
+ "\t${portID++},\n"
+ "\tthis.${childReactor.name} as __NetworkReceiver<unknown>\n)")
}
if (isNetworkSender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,10 @@ ${" |"..preamble.code.toText()}
}

val isFederate = AttributeUtils.isFederate(reactor)
val networkReactorAttribute = AttributeUtils.findAttributeByName(reactor, "_NetworkReactor")
var isNetworkSender = false
var isNetworkReceiver = false
if (networkReactorAttribute != null) {
isNetworkSender = networkReactorAttribute.getAttrParms().get(0).getName() == "Sender"
isNetworkReceiver = networkReactorAttribute.getAttrParms().get(0).getName() == "Receiver"
}
val networkReactorAttributeValue = AttributeUtils.getFirstArgumentValue(AttributeUtils.findAttributeByName(reactor, "_networkReactor"))
var isNetworkReactor = networkReactorAttributeValue != null
var isNetworkReceiver = networkReactorAttributeValue == "receiver"
var isNetworkSender = networkReactorAttributeValue == "sender"

// NOTE: type parameters that are referenced in ports or actions must extend
// Present in order for the program to type check.
Expand Down Expand Up @@ -144,7 +141,7 @@ ${" |"..preamble.code.toText()}
${" | "..actionGenerator.generateClassProperties()}
${" | "..portGenerator.generateClassProperties()}
${" | "..constructorGenerator.generateConstructor(targetConfig, instanceGenerator, timerGenerator, parameterGenerator,
stateGenerator, actionGenerator, portGenerator, isFederate, isNetworkReceiver)}
stateGenerator, actionGenerator, portGenerator, isFederate, isNetworkReactor, isNetworkReceiver)}
|}
|// =============== END reactor class ${reactor.name}
|
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/lib/ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "LinguaFrancaDefault",
"type": "commonjs",
"dependencies": {
"@lf-lang/reactor-ts": "^0.5.0",
"@lf-lang/reactor-ts": "^0.6.0",
"command-line-args": "^5.1.1",
"command-line-usage": "^6.1.3"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Test a distributed system where a federation receives messages only over connections that are
* marked 'physical' (using the ~> arrow) with an after delay. The receiver verifies that the after
* delay is correctly imposed.
*
* @author Edward A. Lee
* @author Soroush Bateni
* @author Byeong-gil Jun
*/
target TypeScript

import Count from "../lib/Count.lf"

reactor Print {
input inp: number
state c: number = 1

reaction(inp) {=
const elapsedTime = util.getElapsedLogicalTime();
console.log(`At time ${elapsedTime}, received ${inp}`);
if (inp !== c) {
util.requestErrorStop(`ERROR: Expected to receive ${c}.`);
}
if (!elapsedTime.isLaterThan(TimeValue.msec(600))) {
util.requestErrorStop(`ERROR: Expected received time to be strictly greater than ${TimeValue.msec(600)}`);
}
c++;
console.log(`c = ${c}`);
util.requestStop();
=}

reaction(shutdown) {=
if (c !== 2) {
util.requestErrorStop(`ERROR: Expected to receive 1 item. Received ${c - 1}.`);
} else {
console.log("SUCCESS: Successfully received 1 item.");
}
=}
}

federated reactor at localhost {
c = new Count(offset = 200 msec, period=0)
p = new Print()
c.out ~> p.inp after 400 msec // Indicating a 'physical' connection with a 400 msec after delay.
}
64 changes: 64 additions & 0 deletions test/TypeScript/src/federated/LoopDistributedCentralized.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* This tests a feedback loop with physical actions and centralized coordination.
*
* @author Edward A. Lee
* @author Hokeun Kim
*/
target TypeScript {
timeout: 5 sec
}

reactor Looper(incr: number = 1, delay: time = 0 msec) {
input inp: number
output out: number
physical action a(delay)
state count: number = 0

preamble {=
let stop = false;
// Function to trigger an action once every second.
function ping(act: any) {
if (!stop) {
console.log("Scheduling action.");
act.schedule(0, null);
setTimeout(ping, 1000, act);
}
}
=}

reaction(startup) -> a {=
// Start the ping function for triggering an action every second.
console.log("Starting ping function.");
ping(actions.a);
=}

reaction(a) -> out {=
out = count;
count += incr;
=}

reaction(inp) {=
let logical = util.getCurrentLogicalTime();
let physical = util.getCurrentPhysicalTime();

let time_lag = physical.subtract(logical);

console.log("Received " + inp + ". Logical time is behind physical time by " + time_lag + ".");
=}

reaction(shutdown) {=
console.log("******* Shutdown invoked.");
// Stop the ping function that is scheduling actions.
stop = true;
if (count != 5 * incr) {
util.requestErrorStop("Failed to receive all five expected inputs.");
}
=}
}

federated reactor LoopDistributedCentralized(delay: time = 0) {
left = new Looper()
right = new Looper(incr=-1)
left.out -> right.inp
right.out -> left.inp
}
79 changes: 79 additions & 0 deletions test/TypeScript/src/federated/LoopDistributedDouble.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* This tests a feedback loop with physical actions and centralized coordination.
*
* @author Edward A. Lee
* @author Hokeun Kim
*/
target TypeScript {
timeout: 5 sec,
coordination-options: {
advance-message-interval: 100 msec
}
}

reactor Looper(incr: number = 1, delay: time = 0 msec) {
input inp: number
input inp2: number
output out: number
output out2: number
physical action a(delay)
state count: number = 0
timer t(0, 1 sec)

preamble {=
let stop = false;
// Function to trigger an action once every second.
function ping(act: any) {
if (!stop) {
console.log("Scheduling action.");
act.schedule(0, null);
setTimeout(ping, 1000, act);
}
}
=}

reaction(startup) -> a {=
// Start the ping function for triggering an action every second.
console.log("Starting ping function.");
ping(actions.a);
=}

reaction(a) -> out, out2 {=
if (count % 2 == 0) {
out = count;
} else {
out2 = count;
}
count += incr;
=}

reaction(inp) {=
console.log("Received " + inp + " on inp at logical time " + util.getElapsedLogicalTime() + ".");
=}

reaction(inp2) {=
console.log("Received " + inp2 + " on inp2 at logical time " + util.getElapsedLogicalTime() + ".");
=}

reaction(t) {=
console.log("Timer triggered at logical time " + util.getElapsedLogicalTime() + ".");
=}

reaction(shutdown) {=
console.log("******* Shutdown invoked.");
// Stop the ping function that is scheduling actions.
stop = true;
if (count != 5 * incr) {
util.requestErrorStop("Failed to receive all five expected inputs.");
}
=}
}

federated reactor(delay: time = 0) {
left = new Looper()
right = new Looper(incr=-1)
left.out -> right.inp
right.out -> left.inp
right.out2 -> left.inp2
left.out2 -> right.inp2
}
Loading

0 comments on commit 8bd0390

Please sign in to comment.