Skip to content

Commit

Permalink
fix stopping stream through asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
timonmerk committed Sep 27, 2024
1 parent 6c49866 commit 731546b
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 28 deletions.
8 changes: 4 additions & 4 deletions gui_dev/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
"@vitejs/plugin-react": "^4.3.1",
"@welldone-software/why-did-you-render": "^8.0.3",
"babel-plugin-react-compiler": "latest",
"eslint": "^9.11.0",
"eslint": "^9.11.1",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-jsdoc": "^50.2.4",
"eslint-plugin-react": "^7.36.1",
"eslint-plugin-jsdoc": "^50.3.0",
"eslint-plugin-react": "^7.37.0",
"eslint-plugin-react-compiler": "latest",
"eslint-plugin-react-hooks": "^4.6.2",
"eslint-plugin-react-refresh": "^0.4.12",
"prettier": "^3.3.3",
"vite": "^5.4.7"
"vite": "^5.4.8"
}
}
17 changes: 6 additions & 11 deletions py_neuromodulation/gui/backend/app_backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import logging
import asyncio
import importlib.metadata
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -116,8 +117,8 @@ async def handle_stream_control_stop(data: dict):
action = data["action"]
self.logger.info("Stopping stream")
if action == "stop":
self.pynm_state.stream_handling_queue.put("stop")
self.pynm_state.stream.stream_handling_queue.put("stop")
asyncio.create_task(self.pynm_state.stream_handling_queue.put("stop"))
#self.pynm_state.stream.stream_handling_queue.put("stop")
return {"message": f"Stream action '{action}' executed"}

@self.post("/api/stream-control")
Expand All @@ -129,20 +130,14 @@ async def handle_stream_control(data: dict):
self.logger.info(self.websocket_manager)
# TODO: I cannot interact with stream_state_queue,
# since the async function is really waiting until the stream finished
await self.pynm_state.start_run_function(
asyncio.create_task(self.pynm_state.start_run_function(
#out_dir=data["out_dir"],
#experiment_name=data["experiment_name"],
websocket_manager_features=self.websocket_manager,
)
))

if action == "stop":
#if self.pynm_state.stream.is_running is False:
# # TODO: if the message starts with ERROR we could show the message in a popup
# return {"message": "ERROR: Stream is not running"}

# initiate stream stop and feature save
self.pynm_state.stream_handling_queue.put("stop")
self.pynm_state.stream.stream_handling_queue.put("stop")
await self.pynm_state.stream_handling_queue.put("stop")

return {"message": f"Stream action '{action}' executed"}

Expand Down
23 changes: 12 additions & 11 deletions py_neuromodulation/gui/backend/app_pynm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import numpy as np
from multiprocessing import Process, Queue
from multiprocessing import Process

from py_neuromodulation.stream import Stream, NMSettings
from py_neuromodulation.utils import set_channels
Expand Down Expand Up @@ -33,7 +33,7 @@ async def start_run_function(
# The stream will then put the results in the queue
# there should be another websocket in which the results are sent to the frontend

self.stream_handling_queue = Queue()
self.stream_handling_queue = asyncio.Queue()

self.logger.info("setup stream Process")

Expand All @@ -51,15 +51,16 @@ async def start_run_function(
# },
# )
#asyncio.run(
await self.stream.run(
out_dir=out_dir,
experiment_name=experiment_name,
stream_handling_queue=self.stream_handling_queue,
is_stream_lsl=self.lsl_stream_name is not None,
stream_lsl_name=self.lsl_stream_name
if self.lsl_stream_name is not None
else "",
websocket_featues=websocket_manager_features,
asyncio.create_task(self.stream.run(
out_dir=out_dir,
experiment_name=experiment_name,
stream_handling_queue=self.stream_handling_queue,
is_stream_lsl=self.lsl_stream_name is not None,
stream_lsl_name=self.lsl_stream_name
if self.lsl_stream_name is not None
else "",
websocket_featues=websocket_manager_features,
)
)

# self.logger.info("initialized run process")
Expand Down
7 changes: 5 additions & 2 deletions py_neuromodulation/stream/stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module for generic and offline data streams."""

import asyncio
from typing import TYPE_CHECKING
from collections.abc import Iterator
import numpy as np
Expand Down Expand Up @@ -297,10 +298,12 @@ async def run(
prev_batch_end = 0
for timestamps, data_batch in self.generator:
self.is_running = True
await asyncio.sleep(0.001)
if self.stream_handling_queue is not None:
nm.logger.info("Checking for stop signal")
if not self.stream_handling_queue.empty():
value = self.stream_handling_queue.get()
if value == "stop":
stop_signal = await asyncio.wait_for(self.stream_handling_queue.get(), timeout=0.01)
if stop_signal == "stop":
break
if data_batch is None:
break
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies = [
"uvicorn>=0.30.6",
"websockets>=13.0",
"seaborn >= 0.11",
"cbor2>=5.6.4",
]

[project.optional-dependencies]
Expand Down

0 comments on commit 731546b

Please sign in to comment.