Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federates wait for STP before exiting in decentralized coordination #2373

Closed
Depetrol opened this issue Jul 17, 2024 · 1 comment
Closed
Labels
bug Something isn't working

Comments

@Depetrol
Copy link
Collaborator

In decentralized execution, when STP(STAA) is set, the federation doesn't exit immediately after request_stop(). It waits until the STAA has passed and then exit.

Here's a minimal example to reproduce:

target Python {
    coordination: decentralized
  }
  
  preamble {=
    import time
  =}
  
  reactor Client(STP_offset = {= FOREVER =}) {
    input server_message
    output client_message
  
    reaction(startup) {=
      print("Client Startup!")
    =}
  
    reaction(server_message) -> client_message {=
      val = server_message.value
      time.sleep(0.1)
      val += 1
      print("client:", val)
      if val==13:
          print("client done")
          request_stop()
      if val<13:
          client_message.set(val)
    =} STP(1000s) {=
      print("Client STP Violated!")
      exit(1)
    =}
  }
  
  reactor Server(STP_offset = {= FOREVER =}) {
    output server_message
    input client_message
  
    reaction(startup) -> server_message {=
      print("Server Startup!")
      server_message.set(0)
    =}
  
    reaction(client_message) -> server_message {=
      val = client_message.value
      time.sleep(0.1)
      val += 1
      print("server:", val)
      if val==12:
          print("server done")
          server_message.set(val)
          request_stop()
      if val<12:
          server_message.set(val)
    =} STP(1000s) {=
      print("Server STP Violated!")
      exit(1)
    =}
  }
  
  federated reactor(STP_offset = {= FOREVER =}) {
    client = new Client()
    server = new Server()
    server.server_message -> client.server_message
    client.client_message -> server.client_message after 0
  }

Output logs:

...
server: 10
client: 11
server: 12
server done
DEBUG: RTI: Received message type 10 from federate 1.
DEBUG: RTI handling stop_request from federate 1.
LOG: RTI received from federate 1 a MSG_TYPE_STOP_REQUEST message with tag (0, 7).
LOG: RTI forwarded to federates MSG_TYPE_STOP_REQUEST with tag (0, 7).
DEBUG: RTI: Received message type 11 from federate 0.
LOG: RTI received from federate 0 STOP reply tag (0, 7).
LOG: RTI sent to federates MSG_TYPE_STOP_GRANTED with tag (0, 7)
client: 13
client done
DEBUG: Reading from socket 5 failed with error: `Resource temporarily unavailable`. Will try again.
DEBUG: Reading from socket 4 failed with error: `Resource temporarily unavailable`. Will try again.
DEBUG: Reading from socket 5 failed with error: `Resource temporarily unavailable`. Will try again.
...

When the STP is set to 10s, the federation exits in about ~10s. When it is set to a large value, the federation doesn't exit in reasonable time.

@Depetrol
Copy link
Collaborator Author

Depetrol commented Aug 6, 2024

Solved by #2394 and correctly shutting down the reactions with the following code:

target Python {
  coordination: decentralized
}

preamble {=
  import time
=}

reactor Client(STP_offset = {= FOREVER =}) {
  input server_message
  output client_message

  reaction(startup) {=
    print("Client Startup!")
  =}

  reaction(server_message) -> client_message {=
    val = server_message.value
    time.sleep(0.1)
    val += 1
    print("client:", val)
    if val==13:
        print("client done")
        request_stop()
    client_message.set(val)
  =} STP(1000s) {=
    print("Client STP Violated!")
    exit(1)
  =}
}

reactor Server(STP_offset = {= FOREVER =}) {
  output server_message
  input client_message

  reaction(startup) -> server_message {=
    print("Server Startup!")
    server_message.set(0)
  =}

  reaction(client_message) -> server_message {=
    val = client_message.value
    time.sleep(0.1)
    val += 1
    print("server:", val)
    if val==12:
        print("server done")
        server_message.set(val)
        request_stop()
    server_message.set(val)
  =} STP(1000s) {=
    print("Server STP Violated!")
    exit(1)
  =}
}

federated reactor(STP_offset = {= FOREVER =}) {
  client = new Client()
  server = new Server()
  server.server_message -> client.server_message
  client.client_message -> server.client_message after 0
}

@Depetrol Depetrol closed this as completed Aug 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant