Skip to content

Commit

Permalink
refactor: separate service to manage processes (#97)
Browse files Browse the repository at this point in the history
### Summary of Changes

* Add a separate service to manage processes
* Move memoization logic into own package
* Only handle websocket connections in the server

---------

Co-authored-by: megalinter-bot <[email protected]>
  • Loading branch information
lars-reimann and megalinter-bot authored Apr 28, 2024
1 parent 7b1aff6 commit e19a315
Show file tree
Hide file tree
Showing 20 changed files with 326 additions and 145 deletions.
2 changes: 2 additions & 0 deletions src/safeds_runner/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""The main entry point of the application."""

from __future__ import annotations

from safeds_runner.cli import cli


Expand Down
1 change: 1 addition & 0 deletions src/safeds_runner/memoization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Memoization of function calls."""
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import psutil

from safeds_runner.server._memoization_stats import MemoizationStats
from safeds_runner.server._memoization_strategies import STAT_ORDER_PRIORITY
from safeds_runner.server._memoization_utils import (
from safeds_runner.memoization._memoization_stats import MemoizationStats
from safeds_runner.memoization._memoization_strategies import STAT_ORDER_PRIORITY
from safeds_runner.memoization._memoization_utils import (
MemoizationKey,
_create_memoization_key,
_get_size_of_value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Callable
from typing import TypeAlias

from safeds_runner.server._memoization_stats import MemoizationStats
from safeds_runner.memoization._memoization_stats import MemoizationStats

# Callable = Stat Key Extractor
# A value removal strategy will reorder a list of memoized functions, based on the provided stats for each function.
Expand Down
2 changes: 2 additions & 0 deletions src/safeds_runner/server/_json_encoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Module containing JSON encoding utilities for Safe-DS types."""

from __future__ import annotations

import base64
import json
import math
Expand Down
8 changes: 6 additions & 2 deletions src/safeds_runner/server/_module_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""Module that contains the infrastructure for finding and loading modules in-memory."""

from __future__ import annotations

import importlib.abc
import importlib.util
import logging
import sys
import types
import typing
from abc import ABC
from importlib.machinery import ModuleSpec

if typing.TYPE_CHECKING:
import types
from importlib.machinery import ModuleSpec


class InMemoryLoader(importlib.abc.SourceLoader, ABC):
Expand Down
126 changes: 21 additions & 105 deletions src/safeds_runner/server/_pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
"""Module that contains the infrastructure for pipeline execution in child processes."""

import asyncio
import json
from __future__ import annotations

import linecache
import logging
import multiprocessing
import os
import queue
import runpy
import threading
import typing
from concurrent.futures import ProcessPoolExecutor
from functools import cached_property
from multiprocessing.managers import SyncManager
from pathlib import Path
from typing import Any

import stack_data

from ._memoization_map import MemoizationMap
from ._memoization_utils import (
from safeds_runner.memoization._memoization_map import MemoizationMap
from safeds_runner.memoization._memoization_utils import (
ExplicitIdentityWrapper,
ExplicitIdentityWrapperLazy,
_has_explicit_identity_memory,
_is_deterministically_hashable,
_is_not_primitive,
)

from ._messages import (
Message,
ProgramMessageData,
Expand All @@ -38,6 +34,11 @@
)
from ._module_manager import InMemoryFinder

if typing.TYPE_CHECKING:
import queue

from ._process_manager import ProcessManager


class PipelineManager:
"""
Expand All @@ -47,92 +48,17 @@ class PipelineManager:
subprocess and the main process using a shared message queue.
"""

def __init__(self) -> None:
def __init__(self, process_manager: ProcessManager) -> None:
"""Create a new PipelineManager object, which is lazily started, when needed."""
self._process_manager = process_manager
self._placeholder_map: dict = {}
self._websocket_target: list[asyncio.Queue] = []

@cached_property
def _multiprocessing_manager(self) -> SyncManager:
if multiprocessing.get_start_method() != "spawn":
multiprocessing.set_start_method("spawn", force=True)
return multiprocessing.Manager()

@cached_property
def _messages_queue(self) -> queue.Queue[Message]:
return self._multiprocessing_manager.Queue()

@cached_property
def _process_pool(self) -> ProcessPoolExecutor:
return ProcessPoolExecutor(max_workers=4, mp_context=multiprocessing.get_context("spawn"))

@cached_property
def _messages_queue_thread(self) -> threading.Thread:
return threading.Thread(target=self._handle_queue_messages, daemon=True, args=(asyncio.get_event_loop(),))

@cached_property
def _memoization_map(self) -> MemoizationMap:
return MemoizationMap(self._multiprocessing_manager.dict(), self._multiprocessing_manager.dict()) # type: ignore[arg-type]

def startup(self) -> None:
"""
Prepare the runner for running Safe-DS pipelines.
Firstly, structures shared between processes are lazily created.
After that a message queue handling thread is started in the main process.
This allows receiving messages directly from one of the pipeline processes and relaying information
directly to the websocket connection (to the VS Code extension).
This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash.
"""
_mq = self._messages_queue # Initialize it here before starting a thread to avoid potential race condition
if not self._messages_queue_thread.is_alive():
self._messages_queue_thread.start()
# Ensure that pool is started
_pool = self._process_pool

def _handle_queue_messages(self, event_loop: asyncio.AbstractEventLoop) -> None:
"""
Relay messages from pipeline processes to the currently connected websocket endpoint.
Should be used in a dedicated thread.
Parameters
----------
event_loop:
Event Loop that handles websocket connections.
"""
try:
while self._messages_queue is not None:
message = self._messages_queue.get()
message_encoded = json.dumps(message.to_dict())
# only send messages to the same connection once
for connection in set(self._websocket_target):
asyncio.run_coroutine_threadsafe(connection.put(message_encoded), event_loop)
except BaseException as error: # noqa: BLE001 # pragma: no cover
logging.warning("Message queue terminated: %s", error.__repr__()) # pragma: no cover

def connect(self, websocket_connection_queue: asyncio.Queue) -> None:
"""
Add a websocket connection queue to relay event messages to, which are occurring during pipeline execution.
Parameters
----------
websocket_connection_queue:
Message Queue for a websocket connection.
"""
self._websocket_target.append(websocket_connection_queue)

def disconnect(self, websocket_connection_queue: asyncio.Queue) -> None:
"""
Remove a websocket target connection queue to no longer receive messages.
Parameters
----------
websocket_connection_queue:
Message Queue for a websocket connection to be removed.
"""
self._websocket_target.remove(websocket_connection_queue)
return MemoizationMap(
self._process_manager.create_shared_dict(), # type: ignore[arg-type]
self._process_manager.create_shared_dict(), # type: ignore[arg-type]
)

def execute_pipeline(
self,
Expand All @@ -149,17 +75,16 @@ def execute_pipeline(
execution_id:
Unique ID to identify this execution.
"""
self.startup()
if execution_id not in self._placeholder_map:
self._placeholder_map[execution_id] = self._multiprocessing_manager.dict()
self._placeholder_map[execution_id] = self._process_manager.create_shared_dict()
process = PipelineProcess(
pipeline,
execution_id,
self._messages_queue,
self._process_manager.get_queue(),
self._placeholder_map[execution_id],
self._memoization_map,
)
process.execute(self._process_pool)
process.execute(self._process_manager)

def get_placeholder(self, execution_id: str, placeholder_name: str) -> tuple[str | None, Any]:
"""
Expand All @@ -186,15 +111,6 @@ def get_placeholder(self, execution_id: str, placeholder_name: str) -> tuple[str
value = value.value
return _get_placeholder_type(value), value

def shutdown(self) -> None:
"""
Shut down the multiprocessing manager to end the used subprocess.
This should only be called if this PipelineManager is not intended to be reused again.
"""
self._multiprocessing_manager.shutdown()
self._process_pool.shutdown(wait=True, cancel_futures=True)


class PipelineProcess:
"""A process that executes a Safe-DS pipeline."""
Expand Down Expand Up @@ -316,13 +232,13 @@ def _catch_subprocess_error(self, error: BaseException) -> None:
# This is a callback to log an unexpected failure, executing this is never expected
logging.exception("Pipeline process unexpectedly failed", exc_info=error) # pragma: no cover

def execute(self, pool: ProcessPoolExecutor) -> None:
def execute(self, process_manager: ProcessManager) -> None:
"""
Execute this pipeline in a process from the provided process pool.
Results, progress and errors are communicated back to the main process.
"""
future = pool.submit(self._execute)
future = process_manager.submit(self._execute)
exception = future.exception()
if exception is not None:
self._catch_subprocess_error(exception) # pragma: no cover
Expand Down
Loading

0 comments on commit e19a315

Please sign in to comment.