From 7b8eb34f46301d88b6d5912633ca1d3d45e74aa0 Mon Sep 17 00:00:00 2001 From: WinPlay02 Date: Mon, 4 Dec 2023 22:28:21 +0100 Subject: [PATCH 1/4] fix: use spawn instead of fork to not deadlock when running tests --- src/safeds_runner/server/pipeline_manager.py | 1 + tests/safeds_runner/server/test_runner_main.py | 1 + tests/safeds_runner/server/test_websocket_mock.py | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index 7db0d84..b509a5e 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -41,6 +41,7 @@ def __init__(self) -> None: @cached_property def _multiprocessing_manager(self) -> SyncManager: + multiprocessing.set_start_method('spawn') return multiprocessing.Manager() @cached_property diff --git a/tests/safeds_runner/server/test_runner_main.py b/tests/safeds_runner/server/test_runner_main.py index 0e69e22..d95252a 100644 --- a/tests/safeds_runner/server/test_runner_main.py +++ b/tests/safeds_runner/server/test_runner_main.py @@ -7,6 +7,7 @@ def test_should_runner_start_successfully() -> None: + subprocess._USE_VFORK = False # Do not fork the subprocess as it is unsafe to do process = subprocess.Popen(["poetry", "run", "safe-ds-runner", "start"], cwd=_project_root, stderr=subprocess.PIPE) while process.poll() is None: process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip() diff --git a/tests/safeds_runner/server/test_websocket_mock.py b/tests/safeds_runner/server/test_websocket_mock.py index f428ed1..08d0891 100644 --- a/tests/safeds_runner/server/test_websocket_mock.py +++ b/tests/safeds_runner/server/test_websocket_mock.py @@ -44,7 +44,7 @@ def wait_for_messages(self, wait_for_messages: int = 1) -> None: with self.condition_variable: if len(self.received) >= wait_for_messages: return - self.condition_variable.wait() + self.condition_variable.wait(1.0) # this should not be needed, but it seems the process can get stuck @pytest.mark.parametrize( From 357e8cdb2d9e769e2686b35dc88a5a3bf394abb4 Mon Sep 17 00:00:00 2001 From: megalinter-bot <129584137+megalinter-bot@users.noreply.github.com> Date: Mon, 4 Dec 2023 21:32:10 +0000 Subject: [PATCH 2/4] style: apply automated linter fixes --- src/safeds_runner/server/pipeline_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index b509a5e..e032b25 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -41,7 +41,7 @@ def __init__(self) -> None: @cached_property def _multiprocessing_manager(self) -> SyncManager: - multiprocessing.set_start_method('spawn') + multiprocessing.set_start_method("spawn") return multiprocessing.Manager() @cached_property From b36edf35fea3c3767f183177cd207d478653b8ca Mon Sep 17 00:00:00 2001 From: WinPlay02 Date: Mon, 4 Dec 2023 22:43:23 +0100 Subject: [PATCH 3/4] fix: only set spawn method, when needed --- src/safeds_runner/server/pipeline_manager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index b509a5e..8624dd4 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -35,13 +35,14 @@ class PipelineManager: """ def __init__(self) -> None: - """Create a new PipelineManager object, which needs to be started by calling startup().""" + """Create a new PipelineManager object, which is lazily started, when needed.""" self._placeholder_map: dict = {} self._websocket_target: simple_websocket.Server | None = None @cached_property def _multiprocessing_manager(self) -> SyncManager: - multiprocessing.set_start_method('spawn') + if multiprocessing.get_start_method() != 'spawn': + multiprocessing.set_start_method('spawn', True) return multiprocessing.Manager() @cached_property From 6b2d987f709baa45e8ff34d760082996d34f6d10 Mon Sep 17 00:00:00 2001 From: WinPlay02 Date: Tue, 5 Dec 2023 01:37:13 +0100 Subject: [PATCH 4/4] fix: race condition when initializing manager fix: more strict locking during mocking tests --- src/safeds_runner/server/pipeline_manager.py | 3 ++- tests/safeds_runner/server/test_websocket_mock.py | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index ba3c6f9..11743b5 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -42,7 +42,7 @@ def __init__(self) -> None: @cached_property def _multiprocessing_manager(self) -> SyncManager: if multiprocessing.get_start_method() != "spawn": - multiprocessing.set_start_method("spawn", True) + multiprocessing.set_start_method("spawn", force=True) return multiprocessing.Manager() @cached_property @@ -67,6 +67,7 @@ def _startup(self) -> None: This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash. """ + _mq = self._messages_queue # Initialize it here before starting a thread to avoid potential race condition if not self._messages_queue_thread.is_alive(): self._messages_queue_thread.start() diff --git a/tests/safeds_runner/server/test_websocket_mock.py b/tests/safeds_runner/server/test_websocket_mock.py index 08d0891..5796c7f 100644 --- a/tests/safeds_runner/server/test_websocket_mock.py +++ b/tests/safeds_runner/server/test_websocket_mock.py @@ -23,11 +23,11 @@ def __init__(self, messages: list[str]): self.received: list[str] = [] self.close_reason: int | None = None self.close_message: str | None = None - self.condition_variable = threading.Condition() + self.condition_variable = threading.Condition(lock=threading.Lock()) def send(self, msg: str) -> None: - self.received.append(msg) with self.condition_variable: + self.received.append(msg) self.condition_variable.notify_all() def receive(self) -> str | None: @@ -46,6 +46,10 @@ def wait_for_messages(self, wait_for_messages: int = 1) -> None: return self.condition_variable.wait(1.0) # this should not be needed, but it seems the process can get stuck + def get_next_received_message(self) -> str: + with self.condition_variable: + return self.received.pop(0) + @pytest.mark.parametrize( argnames="websocket_message,exception_message", @@ -208,7 +212,7 @@ def test_should_execute_pipeline_return_exception( mock_connection = MockWebsocketConnection(messages) ws_main(mock_connection, app_pipeline_manager) mock_connection.wait_for_messages(1) - exception_message = Message.from_dict(json.loads(mock_connection.received.pop(0))) + exception_message = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert exception_message.type == expected_response_runtime_error.type assert exception_message.id == expected_response_runtime_error.id @@ -293,7 +297,7 @@ def test_should_execute_pipeline_return_valid_placeholder( # And compare with expected responses while len(expected_responses) > 0: mock_connection.wait_for_messages(1) - next_message = Message.from_dict(json.loads(mock_connection.received.pop(0))) + next_message = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert next_message == expected_responses.pop(0) @@ -360,5 +364,5 @@ def test_should_successfully_execute_simple_flow(messages: list[str], expected_r mock_connection = MockWebsocketConnection(messages) ws_main(mock_connection, app_pipeline_manager) mock_connection.wait_for_messages(1) - query_result_invalid = Message.from_dict(json.loads(mock_connection.received.pop(0))) + query_result_invalid = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert query_result_invalid == expected_response