Skip to content

Commit

Permalink
refactor: put pipeline operations in their own module
Browse files Browse the repository at this point in the history
docs: added comments for functions
  • Loading branch information
WinPlay02 committed Nov 20, 2023
1 parent 215df0b commit 9186587
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 127 deletions.
7 changes: 3 additions & 4 deletions src/safeds_runner/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from flask_sock import Sock

from safeds_runner.server import messages
from safeds_runner.server.module_manager import execute_pipeline, get_placeholder, set_new_websocket_target, \
start_message_queue_handling, setup_multiprocessing
from safeds_runner.server.pipeline_manager import execute_pipeline, get_placeholder, set_new_websocket_target, \
setup_pipeline_execution

app = Flask(__name__)
# Websocket Configuration
Expand Down Expand Up @@ -114,8 +114,7 @@ def send_websocket_message(connection, msg_type: str, msg_data):
parser = argparse.ArgumentParser(description="Start Safe-DS Runner on a specific port.")
parser.add_argument('--port', type=int, default=5000, help='Port on which to run the python server.')
args = parser.parse_args()
setup_multiprocessing()
start_message_queue_handling()
setup_pipeline_execution()
logging.info(f"Starting Safe-DS Runner on port {args.port}")
# Only bind to host=127.0.0.1. Connections from other devices should not be accepted
WSGIServer(('127.0.0.1', args.port), app).serve_forever()
141 changes: 18 additions & 123 deletions src/safeds_runner/server/module_manager.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
import importlib.abc
import multiprocessing
import threading
import queue
from abc import ABC
from importlib.machinery import ModuleSpec
import sys
import importlib.util
import types
import runpy
import logging
import typing
import json
import stack_data

# Multiprocessing
multiprocessing_manager = None
global_placeholder_map = {}
global_messages_queue: queue.Queue | None = None
# Threading
websocket_target = None
messages_queue_thread = None


def setup_multiprocessing():
global multiprocessing_manager, global_messages_queue
multiprocessing_manager = multiprocessing.Manager()
global_messages_queue = multiprocessing_manager.Queue()


class InMemoryLoader(importlib.abc.SourceLoader, ABC):
def __init__(self, code_bytes: bytes, filename: str):
"""
Loads a virtual python module from a byte array and a filename
:param code_bytes: byte array containing python code
:param filename: filename
"""
self.code_bytes = code_bytes
self.filename = filename

Expand All @@ -42,6 +26,11 @@ def get_filename(self, fullname) -> str:

class InMemoryFinder(importlib.abc.MetaPathFinder):
def __init__(self, code: dict[str, dict[str, str]]):
"""
Finds python modules in an in-memory dictionary
:param code: A dictionary containing the code to be executed, grouped by module
path containing a mapping from module name to module code
"""
self.code = code
self.allowed_packages = {key for key in code.keys()}
self.imports_to_remove = set()
Expand Down Expand Up @@ -79,113 +68,19 @@ def find_spec(self, fullname: str, path=None, target: types.ModuleType | None =
origin=parent_package)
return None

def attach(self):
def attach(self) -> None:
"""
Attaches this finder to the meta path
"""
sys.meta_path.append(self)

def detach(self):
def detach(self) -> None:
"""
Removes modules found in this finder and remove finder from meta path
"""
# As modules should not be used from other modules, outside our pipeline,
# it should be safe to just remove all newly imported modules
for key in self.imports_to_remove:
if key in sys.modules.keys():
del sys.modules[key]
sys.meta_path.remove(self)


class PipelineProcess:
def __init__(self, code: dict[str, dict[str, str]], sdspackage: str, sdsmodule: str, sdspipeline: str,
execution_id: str, messages_queue: queue.Queue, placeholder_map: dict[str, typing.Any]):
self.code = code
self.sdspackage = sdspackage
self.sdsmodule = sdsmodule
self.sdspipeline = sdspipeline
self.id = execution_id
self.messages_queue = messages_queue
self.placeholder_map = placeholder_map
self.process = multiprocessing.Process(target=self._execute, daemon=True)

def _send_message(self, message_type: str, value: dict[typing.Any, typing.Any] | str) -> None:
global global_messages_queue
self.messages_queue.put({"type": message_type, "id": self.id, "data": value})

def _send_exception(self, exception: BaseException):
backtrace = get_backtrace_info(exception)
self._send_message("runtime_error", {"message": exception.__str__(), "backtrace": backtrace})

def save_placeholder(self, placeholder_name: str, value: typing.Any) -> None:
self.placeholder_map[placeholder_name] = value

def _execute(self):
logging.info(f"Executing {self.sdspackage}.{self.sdsmodule}.{self.sdspipeline}...")
self.save_placeholder("abc", "deg")
pipeline_finder = InMemoryFinder(self.code)
pipeline_finder.attach()
main_module = f"gen_{self.sdsmodule}_{self.sdspipeline}"
try:
runpy.run_module(main_module, run_name="__main__") # TODO Is the Safe-DS-Package relevant here?
self._send_message("progress", "done")
except BaseException as error:
self._send_exception(error)
finally:
pipeline_finder.detach()

def execute(self):
self.process.start()


def get_backtrace_info(error: BaseException) -> list[dict[str, typing.Any]]:
backtrace_list = []
for frame in stack_data.core.FrameInfo.stack_data(error.__traceback__):
backtrace_list.append({"file": frame.filename, "line": str(frame.lineno)})
return backtrace_list


def execute_pipeline(code: dict[str, dict[str, str]], sdspackage: str, sdsmodule: str, sdspipeline: str, exec_id: str):
global multiprocessing_manager, global_messages_queue, global_placeholder_map
if exec_id not in global_placeholder_map:
global_placeholder_map[exec_id] = multiprocessing_manager.dict()
process = PipelineProcess(code, sdspackage, sdsmodule, sdspipeline, exec_id, global_messages_queue,
global_placeholder_map[exec_id])
process.execute()


def _get_placeholder_type(value: typing.Any):
if isinstance(value, bool):
return "Boolean"
if isinstance(value, float):
return "Float"
if isinstance(value, int):
return "Int"
if isinstance(value, str):
return "String"
if isinstance(value, object):
return type(value).__name__
return "Any"


def get_placeholder(exec_id: str, placeholder_name: str) -> (str | None, typing.Any):
global global_placeholder_map
if exec_id not in global_placeholder_map:
return None, None
if placeholder_name not in global_placeholder_map[exec_id]:
return None, None
value = global_placeholder_map[exec_id][placeholder_name]
return _get_placeholder_type(value), value


def handle_queue_messages():
global websocket_target
while True:
message = global_messages_queue.get()
if websocket_target is not None:
websocket_target.send(json.dumps(message))


def start_message_queue_handling():
global messages_queue_thread
messages_queue_thread = threading.Thread(target=handle_queue_messages, daemon=True)
messages_queue_thread.start()


def set_new_websocket_target(ws):
global websocket_target
websocket_target = ws
175 changes: 175 additions & 0 deletions src/safeds_runner/server/pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import queue
import multiprocessing
import threading
import json
import typing
import runpy
import stack_data
import logging
from safeds_runner.server.module_manager import InMemoryFinder

# Multiprocessing
multiprocessing_manager = None
global_placeholder_map = {}
global_messages_queue: queue.Queue | None = None
# Message Queue
websocket_target = None
messages_queue_thread = None


def setup_pipeline_execution() -> None:
"""
Prepares the runner for running Safe-DS pipelines.
First structures shared between processes are created, after that a message queue handling thread is started in
the main process. This allows receiving messages directly from one of the pipeline processes and relays information
directly to the extension connection.
"""
# Multiprocessing
global multiprocessing_manager, global_messages_queue
multiprocessing_manager = multiprocessing.Manager()
global_messages_queue = multiprocessing_manager.Queue()
# Message Queue
global messages_queue_thread
messages_queue_thread = threading.Thread(target=_handle_queue_messages, daemon=True)
messages_queue_thread.start()


def _handle_queue_messages():
global websocket_target
while True:
message = global_messages_queue.get()
if websocket_target is not None:
websocket_target.send(json.dumps(message))


def set_new_websocket_target(ws) -> None:
"""
Inform the message queue handling thread that the websocket connection has changed.
:param ws: New websocket connection
"""
global websocket_target
websocket_target = ws


class PipelineProcess:
def __init__(self, code: dict[str, dict[str, str]], sdspackage: str, sdsmodule: str, sdspipeline: str,
execution_id: str, messages_queue: queue.Queue, placeholder_map: dict[str, typing.Any]):
"""
Represents a process that executes a Safe-DS pipeline.
:param code: A dictionary containing the code to be executed, in a virtual filesystem
:param sdspackage: Safe-DS package name
:param sdsmodule: Safe-DS module name
:param sdspipeline: Safe-DS main pipeline name
:param execution_id: Unique ID to identify this process
:param messages_queue: A queue to write outgoing messages to
:param placeholder_map: A map to save calculated placeholders in
"""
self.code = code
self.sdspackage = sdspackage
self.sdsmodule = sdsmodule
self.sdspipeline = sdspipeline
self.id = execution_id
self.messages_queue = messages_queue
self.placeholder_map = placeholder_map
self.process = multiprocessing.Process(target=self._execute, daemon=True)

def _send_message(self, message_type: str, value: dict[typing.Any, typing.Any] | str) -> None:
global global_messages_queue
self.messages_queue.put({"type": message_type, "id": self.id, "data": value})

def _send_exception(self, exception: BaseException) -> None:
backtrace = get_backtrace_info(exception)
self._send_message("runtime_error", {"message": exception.__str__(), "backtrace": backtrace})

def save_placeholder(self, placeholder_name: str, value: typing.Any) -> None:
"""
Save a calculated placeholder in the map
:param placeholder_name: Name of the placeholder
:param value: Actual value of the placeholder
"""
self.placeholder_map[placeholder_name] = value

def _execute(self) -> None:
logging.info(f"Executing {self.sdspackage}.{self.sdsmodule}.{self.sdspipeline}...")
pipeline_finder = InMemoryFinder(self.code)
pipeline_finder.attach()
main_module = f"gen_{self.sdsmodule}_{self.sdspipeline}"
try:
runpy.run_module(main_module, run_name="__main__") # TODO Is the Safe-DS-Package relevant here?
self._send_message("progress", "done")
except BaseException as error:
self._send_exception(error)
finally:
pipeline_finder.detach()

def execute(self) -> None:
"""
Executes this pipeline in a newly created process and communicates results, progress and errors back
to the main process
"""
self.process.start()


def get_backtrace_info(error: BaseException) -> list[dict[str, typing.Any]]:
"""
Creates a simplified backtrace from an exception
:param error: Caught exception
:return: List containing file and line information for each stack frame
"""
backtrace_list = []
for frame in stack_data.core.FrameInfo.stack_data(error.__traceback__):
backtrace_list.append({"file": frame.filename, "line": int(frame.lineno)})
return backtrace_list


def execute_pipeline(code: dict[str, dict[str, str]], sdspackage: str, sdsmodule: str, sdspipeline: str,
exec_id: str) -> None:
"""
Runs a Safe-DS pipeline
:param code: A dictionary containing the code to be executed, in a virtual filesystem
:param sdspackage: Safe-DS package name
:param sdsmodule: Safe-DS module name
:param sdspipeline: Safe-DS main pipeline name
:param exec_id: Unique ID to identify this execution
"""
global multiprocessing_manager, global_messages_queue, global_placeholder_map
if exec_id not in global_placeholder_map:
global_placeholder_map[exec_id] = multiprocessing_manager.dict()
process = PipelineProcess(code, sdspackage, sdsmodule, sdspipeline, exec_id, global_messages_queue,
global_placeholder_map[exec_id])
process.execute()


def _get_placeholder_type(value: typing.Any):
"""
:param value: any python object
:return: Safe-DS name corresponding to the given python object instance
"""
if isinstance(value, bool):
return "Boolean"
if isinstance(value, float):
return "Float"
if isinstance(value, int):
return "Int"
if isinstance(value, str):
return "String"
if isinstance(value, object):
return type(value).__name__
return "Any"


def get_placeholder(exec_id: str, placeholder_name: str) -> (str | None, typing.Any):
"""
Gets a placeholder type and value for an execution id and placeholder name
:param exec_id: Unique id identifying execution
:param placeholder_name: Name of the placeholder
:return: Tuple containing placeholder type and placeholder value, or (None, None) if the placeholder does not exist
"""
global global_placeholder_map
if exec_id not in global_placeholder_map:
return None, None
if placeholder_name not in global_placeholder_map[exec_id]:
return None, None
value = global_placeholder_map[exec_id][placeholder_name]
return _get_placeholder_type(value), value

0 comments on commit 9186587

Please sign in to comment.