diff --git a/__tests__/action.test.ts b/__tests__/action.test.ts index 3f13d376..ca73e8e3 100644 --- a/__tests__/action.test.ts +++ b/__tests__/action.test.ts @@ -51,7 +51,7 @@ describe("Intended tag tests", function () { const app = new ReactorWithFederatePortAction(); expect(() => { app._start(); - }).toThrowError("FederatedPortAction must have an intended tag from RTI."); + }).toThrowError("No intended tag given while attempting to schedule an event coming from another federate."); }); it("Intended tag smaller than current tag", function () { diff --git a/__tests__/time.test.ts b/__tests__/time.test.ts index a21994f3..0c83b15f 100644 --- a/__tests__/time.test.ts +++ b/__tests__/time.test.ts @@ -310,12 +310,12 @@ describe("add time value", function () { expect( new Tag(oneThousandMS, 0) .getLaterTag(straightZero) - .isSimultaneousWith(new Tag(oneThousandMS, 0)) + .isSimultaneousWith(new Tag(oneThousandMS, 1)) ).toBeTruthy(); expect( new Tag(oneThousandMS, 1) .getLaterTag(straightZero) - .isSimultaneousWith(new Tag(oneThousandMS, 1)) + .isSimultaneousWith(new Tag(oneThousandMS, 2)) ).toBeTruthy(); }); diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f9..2317316f 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +ts-cyclic-dependencies diff --git a/src/core/action.ts b/src/core/action.ts index d8dd5ce2..21d6b6ea 100644 --- a/src/core/action.ts +++ b/src/core/action.ts @@ -79,20 +79,11 @@ export class Action extends ScheduledTrigger implements Read { tag = tag.getLaterTag(delay); if (this.action.origin === Origin.physical) { - // If the resulting timestamp from delay is less than the current physical time - // on the platform, then the timestamp becomes the current physical time. - // Otherwise the tag is computed like a logical action's tag. - - const physicalTime = getCurrentPhysicalTime(); - if (tag.time.isEarlierThan(physicalTime)) { - tag = new Tag(getCurrentPhysicalTime(), 0); - } else { - tag = tag.getMicroStepsLater(1); - } + tag = new Tag(getCurrentPhysicalTime(), 0).getLaterTag(delay); } else if (this.action instanceof FederatePortAction) { if (intendedTag === undefined) { throw new Error( - "FederatedPortAction must have an intended tag from RTI." + "No intended tag given while attempting to schedule an event coming from another federate." ); } if ( @@ -132,8 +123,6 @@ export class Action extends ScheduledTrigger implements Read { )}` ); tag = intendedTag; - } else if (delay.isEqualTo(TimeValue.zero())) { - tag = tag.getMicroStepsLater(1); } Log.debug( @@ -198,7 +187,7 @@ export class FederatePortAction extends Action { constructor( __parent__: Reactor, origin: Origin, - minDelay: TimeValue = TimeValue.secs(0) + minDelay: TimeValue = TimeValue.zero() ) { super(__parent__, origin, minDelay); } diff --git a/src/core/federation.ts b/src/core/federation.ts index 833c89fc..6de89654 100644 --- a/src/core/federation.ts +++ b/src/core/federation.ts @@ -19,7 +19,6 @@ import { App, Reactor } from "./internal"; - // ---------------------------------------------------------------------// // Federated Execution Constants and Enums // // ---------------------------------------------------------------------// @@ -288,10 +287,37 @@ function isANodeJSCodedError(e: Error): e is NodeJSCodedError { return typeof (e as NodeJSCodedError).code === "string"; } +abstract class NetworkReactor extends Reactor { + // TPO level of this NetworkReactor + protected readonly tpoLevel?: number; + + constructor(parent: Reactor, tpoLevel: number | undefined) { + super(parent); + this.tpoLevel = tpoLevel; + } + + /** + * Getter for the TPO level of this NetworkReactor. + */ + public getTpoLevel(): number | undefined { + return this.tpoLevel; + } + + /** + * This function returns this network reactor's own reactions. + * The edges of those reactions (e.g. port absent reactions, port present reactions, ...) + * should be added to the dependency graph according to TPO levels. + * @returns + */ + public getReactions(): Array> { + return this._getReactions(); + } +} + /** * A network sender is a reactor containing a portAbsentReaction. */ -export class NetworkSender extends Reactor { +export class NetworkSender extends NetworkReactor { /** * The last reaction of a NetworkSender reactor is the "port absent" reaction. * @returns the "port absent" of this reactor @@ -304,7 +330,7 @@ export class NetworkSender extends Reactor { /** * A network receiver is a reactor handling a network input. */ -export class NetworkReceiver extends Reactor { +export class NetworkReceiver extends NetworkReactor { /** * A schedulable action of this NetworkReceiver's network input. */ @@ -315,6 +341,18 @@ export class NetworkReceiver extends Reactor { */ private networkInputActionOrigin: Origin | undefined; + /** + * Last known status of the port, either via a timed message, a port absent, + * or a TAG from the RTI. + */ + public lastKnownStatusTag: Tag; + + constructor(parent: Reactor, tpoLevel: number | undefined) { + super(parent, tpoLevel); + // this.portStatus = PortStatus.UNKNOWN; + this.lastKnownStatusTag = new Tag(TimeValue.never()); + } + /** * Register a federate port's action with the network receiver. * @param networkInputAction The federate port's action for registration. @@ -328,8 +366,14 @@ export class NetworkReceiver extends Reactor { this.networkInputActionOrigin = networkInputAction.origin; } + public getNetworkInputActionOrigin(): Origin | undefined { + return this.networkInputActionOrigin; + } + /** - * Handle a message being received from the RTI. + * Handle a timed message being received from the RTI. + * This function is for NetworkReceiver reactors. + * @param portID The destination port ID of the message. * @param value The payload of the message. */ public handleMessage(value: T): void { @@ -1187,6 +1231,11 @@ export class FederatedApp extends App { */ private readonly rtiClient: RTIClient; + /** + * Variable to track how far in the reaction queue we can go until we need to wait for more network port statuses to be known. + */ + private maxLevelAllowedToAdvance = 0; + /** * An array of network receivers */ @@ -1200,6 +1249,9 @@ export class FederatedApp extends App { */ private readonly networkSenders: NetworkSender[] = []; + /** + * An array of port absent reactions + */ private readonly portAbsentReactions = new Set>(); /** @@ -1329,9 +1381,13 @@ export class FederatedApp extends App { () => `Adding dummy event for time: ${physicalTime}` ); this._addDummyEvent(new Tag(physicalTime)); + // Notify the NET to the RTI. + this.sendRTINextEventTag(new Tag(physicalTime)); return false; } } + // FIXME: NET should be sent from a federate that doesn't have any upstream federates + // See the issue #134. this.sendRTINextEventTag(nextEvent.tag); Log.debug( this, @@ -1347,6 +1403,44 @@ export class FederatedApp extends App { return true; } + /** + * @override + * Iterate over all reactions in the reaction queue and execute them. + * If a reaction's priority is less than MLAA, stop executing and return. + * @returns Whether every reactions at this tag are executed. + */ + protected _react(): boolean { + let r: Reaction; + while (this._reactionQ.size() > 0) { + r = this._reactionQ.peek(); + if ( + this.upstreamFedIDs.length === 0 || + r.getPriority() < this.maxLevelAllowedToAdvance + ) { + try { + r = this._reactionQ.pop(); + r.doReact(); + } catch (e) { + Log.error(this, () => `Exception occurred in reaction: ${r}: ${e}`); + // Allow errors in reactions to kill execution. + throw e; + } + } else { + Log.global.debug( + "Max level allowed to advance is higher than the next reaction's priority." + ); + return false; + } + } + if (this.maxLevelAllowedToAdvance !== Number.MAX_SAFE_INTEGER) { + // The reaction queue is empty but some network receivers are waiting for + // network inputs. + return false; + } + Log.global.debug("Finished handling all events at current time."); + return true; + } + protected _iterationComplete(): void { const currentTime = this.util.getCurrentTag(); this.sendRTILogicalTimeComplete(currentTime); @@ -1461,15 +1555,162 @@ export class FederatedApp extends App { } /** - * Enqueue network output control reactions that will send a MSG_TYPE_PORT_ABSENT + * Update the last known status tag of all network input ports + * to the value of "tag", unless that the provided `tag` is less + * than the last_known_status_tag of the port. This is called when + * this federated receives a TAG. + * @param tag The received TAG + */ + private _updateLastKnownStatusOnInputPorts(tag: Tag): void { + Log.debug(this, () => { + return "In update_last_known_status_on_input ports."; + }); + let anyStatusChanged = false; + this.networkReceivers.forEach( + (networkReceiver: NetworkReceiver, portID: number) => { + // This is called when a TAG is received. + // But it is possible for an input port to have received already + // a message with a larger tag (if there is an after delay on the + // connection), in which case, the last known status tag of the port + // is in the future and should not be rolled back. So in that case, + // we do not update the last known status tag. + if (tag.isGreaterThanOrEqualTo(networkReceiver.lastKnownStatusTag)) { + Log.debug(this, () => { + return ( + `Updating the last known status tag of port ${portID} from ` + + `${networkReceiver.lastKnownStatusTag} to ${tag}.` + ); + }); + networkReceiver.lastKnownStatusTag = tag; + anyStatusChanged = true; + } + } + ); + if (anyStatusChanged) { + this._updateMaxLevel(); + } + } + + /** + * Update the last known status tag of a network input port + * to the value of "tag". This is the largest tag at which the status + * (present or absent) of the port was known. + * @param tag The tag on which the latest status of network input + * ports is known. + * @param portID The port ID + */ + private _updateLastKnownStatusOnInputPort(tag: Tag, portID: number): void { + const networkReceiver = this.networkReceivers.get(portID); + if (networkReceiver !== undefined) { + if (tag.isGreaterThanOrEqualTo(networkReceiver.lastKnownStatusTag)) { + if (tag.isSimultaneousWith(networkReceiver.lastKnownStatusTag)) { + // If the intended tag for an input port is equal to the last known status, we need + // to increment the microstep. This is a direct result of the behavior of the getLaterTag() + // semantics in time.ts. + tag = tag.getMicroStepsLater(1); + } + Log.debug(this, () => { + return ( + `Updating the last known status tag of port ${portID} from ` + + `${networkReceiver.lastKnownStatusTag} to ${tag}.` + ); + }); + networkReceiver.lastKnownStatusTag = tag; + const prevMLAA = this.maxLevelAllowedToAdvance; + this._updateMaxLevel(); + if (prevMLAA < this.maxLevelAllowedToAdvance) { + this._requestImmediateInvocationOfNext(); + } + } else { + Log.debug(this, () => { + return ( + "Attempt to update the last known status tag " + + `of network input port ${portID} to an earlier tag was ignored.` + ); + }); + } + } + } + + /** + * Enqueue network port absent reactions that will send a MSG_TYPE_PORT_ABSENT * message to downstream federates if a given network output port is not present. */ - protected enqueuePortAbsentReactions(): void { + private _enqueuePortAbsentReactions(): void { this.portAbsentReactions.forEach((reaction) => { this._reactionQ.push(reaction); }); } + /** + * Update the max level allowed to advance (MLAA) for the current logical timestep. + * If it's safe to complete the current tag by the last TAG, set the MLAA to the infinity. + * Else, check the network port's status and update the MLAA. + * + * @param tag + * @param isProvisional + */ + private _updateMaxLevel(): void { + this.maxLevelAllowedToAdvance = Number.MAX_SAFE_INTEGER; + Log.debug(this, () => { + return `last TAG = ${this.greatestTimeAdvanceGrant.time}`; + }); + if ( + this.util.getCurrentTag().isSmallerThan(this.greatestTimeAdvanceGrant) || + (this.util + .getCurrentTag() + .isSimultaneousWith(this.greatestTimeAdvanceGrant) && + !this._isLastTAGProvisional) + ) { + Log.debug(this, () => { + return ( + `Updated MLAA to ${ + this.maxLevelAllowedToAdvance + } at time ${this.util.getElapsedLogicalTime()} ` + + `with lastTAG = ${ + this.greatestTimeAdvanceGrant + } and current time ${this.util.getCurrentLogicalTime()}.` + ); + }); + return; // Safe to complete the current tag. + } + for (const networkReceiver of Array.from( + this.networkReceivers.values() + ).filter((receiver) => receiver.getTpoLevel() !== undefined)) { + if ( + this.util + .getCurrentTag() + .isGreaterThan(networkReceiver.lastKnownStatusTag) && + networkReceiver.getNetworkInputActionOrigin() !== Origin.physical + ) { + const candidate = networkReceiver.getReactions()[0].getPriority(); + if (this.maxLevelAllowedToAdvance > candidate) { + this.maxLevelAllowedToAdvance = candidate; + } + } + } + Log.debug(this, () => { + return ( + `Updated MLAA to ${ + this.maxLevelAllowedToAdvance + } at time ${this.util.getElapsedLogicalTime()} ` + + `with lastTAG = ${ + this.greatestTimeAdvanceGrant + } and current time ${this.util.getCurrentLogicalTime()}.` + ); + }); + } + + /** + * @override + * Do the steps needed for the new logical tag. + * Enqueue network port absent reactions and update max level for the new tag. + */ + protected _startTimeStep(): void { + this._enqueuePortAbsentReactions(); + this._updateMaxLevel(); + } + /** * Send a message to a potentially remote federate's port via the RTI. This message * is untimed, and will be timestamped by the destination federate when it is received. @@ -1552,6 +1793,22 @@ export class FederatedApp extends App { * advance logical time. */ public sendRTINextEventTag(nextTag: Tag): void { + if ( + this.upstreamFedIDs.length === 0 && + this.downstreamFedIDs.length === 0 + ) { + // This federate is not connected (except possibly by physical links) + // so there is no need for the RTI to get involved. + this.greatestTimeAdvanceGrant = nextTag; + this._requestImmediateInvocationOfNext(); + Log.debug(this, () => { + return ( + `Granted tag ${nextTag} because the federate has neither ` + + "upstream nor downstream federates." + ); + }); + return; + } Log.debug(this, () => { return `Sending RTI next event time with time: ${nextTag}`; }); @@ -1619,6 +1876,67 @@ export class FederatedApp extends App { this.rtiClient.closeRTIConnection(); } + /** + * Add edges between reactions of network sender and receiver reactors. + * @note Network reactors from a connection with a delay do not have a TPO leve and + * are ignored in this operation. + */ + _addEdgesForTpoLevels(): void { + const networkReceivers = Array.from(this.networkReceivers.values()).filter( + (receiver) => receiver.getTpoLevel() !== undefined + ) as NetworkReactor[]; + const networkReactors = networkReceivers.concat( + this.networkSenders.filter( + (sender) => sender.getTpoLevel() !== undefined + ) as NetworkReactor[] + ); + networkReactors.sort((a: NetworkReactor, b: NetworkReactor): number => { + const tpoOfA = a.getTpoLevel(); + const tpoOfB = b.getTpoLevel(); + if (tpoOfA !== undefined && tpoOfB !== undefined) { + return tpoOfA - tpoOfB; + } else { + Log.error(this, () => { + return "Attempts to add edges for reactions that do not have a TPO level."; + }); + return 0; + } + }); + + for (let i = 0; i < networkReactors.length - 1; i++) { + for (const upstream of networkReactors[i].getReactions()) { + for (const downstream of networkReactors[i + 1].getReactions()) { + this._dependencyGraph.addEdge(upstream, downstream); + } + } + } + } + + /** + * Start executing reactions. + */ + protected _startExecuting(): void { + Log.info(this, () => Log.hr); + Log.info(this, () => Log.hr); + + Log.info( + this, + () => `>>> Start of execution: ${this.util.getCurrentTag()}` + ); + Log.info(this, () => Log.hr); + + // Send RTI a NET and wait for PTAG or TAG. + this.sendRTINextEventTag(this.util.getCurrentTag()); + + if ( + this.greatestTimeAdvanceGrant.isSimultaneousWith(this.util.getStartTag()) + ) { + // PTAG for the start tag is already received, call _next immediately. + this._updateMaxLevel(); + this._requestImmediateInvocationOfNext(); + } + } + /** * @override * Register this federated app with the RTI and request a start time. @@ -1628,10 +1946,14 @@ export class FederatedApp extends App { * time message from the RTI. */ _start(): void { + this._addEdgesForTpoLevels(); + this._analyzeDependencies(); this._loadStartupReactions(); + this._startTimeStep(); + this.rtiClient.on("connected", () => { this.rtiClient.sendNeighborStructure( this.upstreamFedIDs, @@ -1700,6 +2022,7 @@ export class FederatedApp extends App { try { this.networkReceivers.get(destPortID)?.handleTimedMessage(value, tag); + this._updateLastKnownStatusOnInputPort(tag, destPortID); } catch (e) { Log.error(this, () => { return `${e}`; @@ -1712,33 +2035,62 @@ export class FederatedApp extends App { Log.debug(this, () => { return `Time Advance Grant received from RTI for ${tag}.`; }); - if (this.greatestTimeAdvanceGrant.isSmallerThan(tag)) { + if (this.greatestTimeAdvanceGrant.isSmallerThanOrEqualTo(tag)) { // Update the greatest time advance grant and immediately // wake up _next, in case it was blocked by the old time advance grant this.greatestTimeAdvanceGrant = tag; this._isLastTAGProvisional = false; + this._updateLastKnownStatusOnInputPorts(tag); this._requestImmediateInvocationOfNext(); + } else { + Log.error(this, () => { + return ( + `Received a TAG ${tag} that wasn't larger than the previous ` + + `TAG or PTAG ${this.greatestTimeAdvanceGrant}. Ignoring the TAG.` + ); + }); } }); - this.rtiClient.on("provisionalTimeAdvanceGrant", (tag: Tag) => { + this.rtiClient.on("provisionalTimeAdvanceGrant", (ptag: Tag) => { + if ( + ptag.isSmallerThan(this.greatestTimeAdvanceGrant) || + (ptag.isSimultaneousWith(this.greatestTimeAdvanceGrant) && + !this._isLastTAGProvisional) + ) { + Log.error(this, () => { + return ( + `Received a PTAG ${ptag} that is equal to or earlier than` + + `an already received TAG ${this.greatestTimeAdvanceGrant}.` + ); + }); + } Log.debug(this, () => { return `Provisional Time Advance Grant received from RTI for ${String( - tag + ptag )}.`; }); - if (this.greatestTimeAdvanceGrant.isSmallerThan(tag)) { - // Update the greatest time advance grant and immediately - // wake up _next, in case it was blocked by the old time advance grant - // FIXME: Temporarily disabling PTAG handling until the - // MLAA based execution is implemented. - /* - this.greatestTimeAdvanceGrant = tag; - this._isLastTAGProvisional = true; - this._requestImmediateInvocationOfNext(); - */ - // + // Update the greatest time advance grant and update MLAA. + this.greatestTimeAdvanceGrant = ptag; + this._isLastTAGProvisional = true; + if (!this._active) { + // PTAG is received before starting execution, return. + // The pending process of PTAG will be done in _startExecuting. + return; + } + this._updateMaxLevel(); + // Possibly insert a dummy event into the event queue if current time is behind. + if (this.util.getCurrentTag().isSmallerThan(ptag)) { + this._addDummyEvent(ptag); + Log.debug(this, () => { + return ( + `At tag ${this.util.getCurrentTag()}, inserting the event queue a dummy event ` + + `with tag ${ptag}.` + ); + }); } + // Wake up _next, in case it was blocked by the old time advance grant. + this._requestImmediateInvocationOfNext(); }); this.rtiClient.on("stopRequest", (tag: Tag) => { @@ -1779,12 +2131,7 @@ export class FederatedApp extends App { Log.debug(this, () => { return `Port Absent received from RTI for ${intendedTag}.`; }); - // FIXME: Temporarily disabling portAbsent until the - // MLAA based execution is implemented. - // this.updatelastKnownStatusTag(intendedTag, portID); - // if (this._isReactionRemainedAtThisTag === true) { - // this._requestImmediateInvocationOfNext(); - // } + this._updateLastKnownStatusOnInputPort(intendedTag, portID); }); this.rtiClient.connectToRTI(this.rtiPort, this.rtiHost); diff --git a/src/core/graph.ts b/src/core/graph.ts index be915dde..3a3a72ae 100644 --- a/src/core/graph.ts +++ b/src/core/graph.ts @@ -373,25 +373,34 @@ export class SortablePrecedenceGraph< if (pg == null || type == null) return; const visited = new Set(); - const search = (parentNode: T, nodes: Set): void => { - for (const node of nodes) { - if (node instanceof type) { - this.addEdge(node, parentNode); - if (!visited.has(node)) { - visited.add(node); - search(node, pg.getUpstreamNeighbors(node)); + const startNodes = pg.getSourceNodes(); + + const search = (upstreamNode: T, downstreamNodes: Set): void => { + for (const downstreamNode of downstreamNodes) { + if (downstreamNode instanceof type) { + this.addEdge(upstreamNode, downstreamNode); + if (!visited.has(downstreamNode)) { + visited.add(downstreamNode); + search(downstreamNode, pg.getDownstreamNeighbors(downstreamNode)); } } else { - search(parentNode, pg.getUpstreamNeighbors(node)); + // Look further downstream for neighbors that match the type. + search(upstreamNode, pg.getDownstreamNeighbors(downstreamNode)); } } }; - const leafs = pg.getSinkNodes(); - for (const leaf of leafs) { - if (leaf instanceof type) { - this.addNode(leaf); - search(leaf, pg.getUpstreamNeighbors(leaf)); - visited.clear(); + + for (const node of startNodes) { + if (node instanceof type) { + this.addNode(node); + search(node, pg.getDownstreamNeighbors(node)); + } else { + // Look further upstream for start nodes that match the type. + for (const newStartNode of pg.getDownstreamNeighbors(node)) { + if (!visited.has(newStartNode)) { + startNodes.add(newStartNode); + } + } } } } diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 93d0e167..806bf2af 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -1833,6 +1833,14 @@ export class App extends Reactor { */ private readonly _reactorsToRemove = new Array(); + /** + * Stores whether the current tag's reactions queue is empty. + * This will be false when a federate waits for network inputs. + * Note that this should be initialized with false to handle startup + * reactions of each federate. + */ + private _isDone = false; + /** * Stores whether the last received TAG (Tag Advance Grant) was provisional. * Every federate starts out assuming that it has been granted a PTAG @@ -2273,8 +2281,10 @@ export class App extends Reactor { /** * Iterate over all reactions in the reaction queue and execute them. + * @returns Whether every reactions at this tag are executed. This can + * be false in the federated exection. */ - private _react(): void { + protected _react(): boolean { let r: Reaction; while (this._reactionQ.size() > 0) { try { @@ -2287,12 +2297,79 @@ export class App extends Reactor { } } Log.global.debug("Finished handling all events at current time."); + return true; } - protected enqueuePortAbsentReactions(): void { + /** + * Do the steps needed for the new logical tag. + * This function is overriden by federation.ts. + */ + protected _startTimeStep(): void { return undefined; } + /** + * Pop all events from event_q with timestamp equal to current tag, + * extract all the reactions triggered by these events, and stick them + * into the reaction queue. + */ + private _popEvents(): void { + // Execute all reactions that are triggered at the current tag + // in topological order. After that, if the next event on the event queue + // has the same time (but a greater microstep), repeat. This prevents + // JS event loop from gaining control and imposing overhead. Asynchronous + // activity therefore might get blocked, but since the results of such + // activities are typically reported via physical actions, the tags of + // the resulting events would be in the future, anyway. + let nextEvent = this._eventQ.peek(); + do { + // Keep popping the event queue until the next event has a different tag. + while (nextEvent?.tag.isSimultaneousWith(this._currentTag) ?? false) { + const trigger = nextEvent?.trigger; + this._eventQ.pop(); + Log.debug(this, () => `Popped off the event queue: ${trigger}`); + // Handle timers. + if (trigger instanceof Timer) { + if (!trigger.period.isZero()) { + Log.debug(this, () => `Rescheduling timer ${trigger}`); + + this.__runtime.schedule( + new TaggedEvent( + trigger, + this._currentTag.getLaterTag(trigger.period), + null + ) + ); + } + } + + // Load reactions onto the reaction queue. + if (nextEvent != null) { + trigger?.update(nextEvent); + } + + // Look at the next event on the queue. + nextEvent = this._eventQ.peek(); + } + + // // End of this execution step. Perform cleanup. + // while (this._reactorsToRemove.length > 0) { + // // const r = this._reactorsToRemove.pop(); + // // FIXME: doing this for the entire model at the end of execution + // // could be a pretty significant performance hit, so we probably + // // don't want to do this + // // r?._unplug() FIXME: visibility + // } + + // Peek at the event queue to see whether we can process the next event + // or should give control back to the JS event loop. + nextEvent = this._eventQ.peek(); + } while ( + nextEvent != null && + this._currentTag.isSimultaneousWith(nextEvent.tag) + ); + } + /** * Handle the next events on the event queue. * ---- @@ -2312,89 +2389,43 @@ export class App extends Reactor { * stimuli. */ private _next(): void { - // TODO: Check the MLAA and execute only allowed reactions let nextEvent = this._eventQ.peek(); - if (nextEvent != null) { - // Check whether the next event can be handled, or not quite yet. - // A holdup can occur in a federated execution. - if (!this._canProceed(nextEvent)) { - // If this happens, then a TAG from the RTI will trigger the - // next invocation of _next. - return; - } - // If it is too early to handle the next event, set a timer for it - // (unless the "fast" option is enabled), and give back control to - // the JS event loop. - if ( - getCurrentPhysicalTime().isEarlierThan(nextEvent.tag.time) && - !this._fast - ) { - this._setAlarmOrYield(nextEvent.tag); - return; - } - - // Advance logical time. - this._advanceTime(nextEvent.tag); - - // Start processing events. Execute all reactions that are triggered - // at the current tag in topological order. After that, if the next - // event on the event queue has the same time (but a greater - // microstep), repeat. This prevents JS event loop from gaining - // control and imposing overhead. Asynchronous activity therefore - // might get blocked, but since the results of such activities are - // typically reported via physical actions, the tags of the - // resulting events would be in the future, anyway. - do { - // Keep popping the event queue until the next event has a different tag. - while (nextEvent?.tag.isSimultaneousWith(this._currentTag) ?? false) { - const trigger = nextEvent?.trigger; - this._eventQ.pop(); - Log.debug(this, () => `Popped off the event queue: ${trigger}`); - // Handle timers. - if (trigger instanceof Timer) { - if (!trigger.period.isZero()) { - Log.debug(this, () => `Rescheduling timer ${trigger}`); - - this.__runtime.schedule( - new TaggedEvent( - trigger, - this._currentTag.getLaterTag(trigger.period), - null - ) - ); - } - } - - // Load reactions onto the reaction queue. - if (nextEvent != null) { - trigger?.update(nextEvent); - } - - // Look at the next event on the queue. - nextEvent = this._eventQ.peek(); + if (nextEvent != null || !this._isDone) { + if (nextEvent != null && this._isDone) { + // We're trying to advance a tag to the next event. + + // Check whether the next event can be handled, or not quite yet. + // A holdup can occur in a federated execution. + if (!this._canProceed(nextEvent)) { + // If this happens, then a TAG from the RTI will trigger the + // next invocation of _next. + return; } - - // End of this execution step. Perform cleanup. - while (this._reactorsToRemove.length > 0) { - // const r = this._reactorsToRemove.pop(); - // FIXME: doing this for the entire model at the end of execution - // could be a pretty significant performance hit, so we probably - // don't want to do this - // r?._unplug() FIXME: visibility + // If it is too early to handle the next event, set a timer for it + // (unless the "fast" option is enabled), and give back control to + // the JS event loop. + if ( + getCurrentPhysicalTime().isEarlierThan(nextEvent.tag.time) && + !this._fast + ) { + this._setAlarmOrYield(nextEvent.tag); + return; } - // Peek at the event queue to see whether we can process the next event - // or should give control back to the JS event loop. - nextEvent = this._eventQ.peek(); - } while ( - nextEvent != null && - this._currentTag.isSimultaneousWith(nextEvent.tag) - ); - // enqueue portAbsentReactions - this.enqueuePortAbsentReactions(); + // Advance logical time. + this._advanceTime(nextEvent.tag); + + // Start time step. + this._startTimeStep(); + } + // Start processing events. + this._popEvents(); // React to all the events loaded onto the reaction queue. - this._react(); + this._isDone = this._react(); + if (!this._isDone) { + return; + } nextEvent = this._eventQ.peek(); // Done handling events. @@ -2673,11 +2704,9 @@ export class App extends Reactor { Log.info(this, () => `>>> Start of execution: ${this._currentTag}`); Log.info(this, () => Log.hr); - // enqueue portAbsentReactions - this.enqueuePortAbsentReactions(); // Handle the reactions that were loaded onto the reaction queue. - this._react(); + this._isDone = this._react(); // Continue execution by processing the next event. this._next(); diff --git a/src/core/time.ts b/src/core/time.ts index 0cd849a4..a5f867f1 100644 --- a/src/core/time.ts +++ b/src/core/time.ts @@ -467,8 +467,10 @@ export class Tag { * @param delay The time interval to add to this time instant. */ getLaterTag(delay: TimeValue | undefined): Tag { - if (delay === undefined || delay.isZero()) { + if (delay === undefined) { return this; + } else if (delay.isZero()) { + return new Tag(this.time, this.microstep + 1); } else { return new Tag(delay.add(this.time), 0); }