diff --git a/gnes/cli/api.py b/gnes/cli/api.py index 6528c9aa..376e24dd 100644 --- a/gnes/cli/api.py +++ b/gnes/cli/api.py @@ -47,10 +47,7 @@ def route(args): def frontend(args): from ..service.frontend import FrontendService - import threading - with FrontendService(args): - forever = threading.Event() - forever.wait() + _start_service(FrontendService, args) def client(args): diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 39d5d40f..59a8f392 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -1,11 +1,18 @@ from collections import OrderedDict, defaultdict +from contextlib import ExitStack +from functools import wraps from typing import Union, Tuple, List from ..cli.parser import set_router_parser, set_indexer_parser, \ set_frontend_parser, set_preprocessor_parser, \ set_encoder_parser from ..helper import set_logger -from ..service.base import SocketType, BaseService, BetterEnum +from ..service.base import SocketType, BaseService, BetterEnum, ServiceManager +from ..service.encoder import EncoderService +from ..service.frontend import FrontendService +from ..service.indexer import IndexerService +from ..service.preprocessor import PreprocessorService +from ..service.router import RouterService class Service(BetterEnum): @@ -16,47 +23,77 @@ class Service(BetterEnum): Preprocessor = 4 +def _build_level(required_level: 'Flow.BuildLevel'): + def __build_level(func): + @wraps(func) + def arg_wrapper(self, *args, **kwargs): + if hasattr(self, '_build_level'): + if self._build_level.value >= required_level.value: + return func(self, *args, **kwargs) + else: + raise ValueError('build_level check failed for %s, required: %s, actual: %s' % ( + func.__name__, required_level, self._build_level)) + else: + raise AttributeError('%r has no attribute "_build_level"' % self) + + return arg_wrapper + + return __build_level + + class Flow: _supported_orch = {'swarm', 'k8s'} - _supported_service = { + _service2parser = { Service.Encoder: set_encoder_parser(), Service.Router: set_router_parser(), Service.Indexer: set_indexer_parser(), Service.Frontend: set_frontend_parser(), - Service.Preprocessor: set_preprocessor_parser() + Service.Preprocessor: set_preprocessor_parser(), } + _service2builder = { + Service.Encoder: lambda x: ServiceManager(EncoderService, x), + Service.Router: lambda x: ServiceManager(RouterService, x), + Service.Indexer: lambda x: ServiceManager(IndexerService, x), + Service.Preprocessor: lambda x: ServiceManager(PreprocessorService, x), + Service.Frontend: FrontendService, + } + + class BuildLevel(BetterEnum): + EMPTY = 0 + GRAPH = 1 + RUNTIME = 2 def __init__(self, with_frontend: bool = True, **kwargs): self.logger = set_logger(self.__class__.__name__) self._service_nodes = OrderedDict() self._service_edges = {} - self._service_name_counter = {k: 0 for k in self._supported_service.keys()} + self._service_name_counter = {k: 0 for k in self._service2parser.keys()} + self._service_contexts = [] self._last_add_service = None self._common_kwargs = kwargs self._frontend = None - self._is_built = False + self._client = None + self._build_level = Flow.BuildLevel.EMPTY + self._backend = None if with_frontend: - self.add(Service.Frontend) + self.add_frontend() else: - self.logger.warning('with_frontend is set to False, you need to add frontend service by yourself') + self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself') + @_build_level(BuildLevel.GRAPH) def to_yaml(self, orchestration: str) -> str: if orchestration not in Flow._supported_orch: raise TypeError( - '%s is not valid type of orchestration, should be one of %s' % (orchestration, self._supported_orch)) + '%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch)) @staticmethod def from_yaml(orchestration: str) -> 'Flow': if orchestration not in Flow._supported_orch: raise TypeError( - '%s is not valid type of orchestration, should be one of %s' % (orchestration, self._supported_orch)) - - def _check_is_built(self): - if not self._is_built: - raise ValueError('this flow is not built yet, please call build() first') + '%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch)) + @_build_level(BuildLevel.GRAPH) def to_mermaid(self, left_right: bool = True): - self._check_is_built() mermaid_graph = OrderedDict() for k in self._service_nodes.keys(): mermaid_graph[k] = [] @@ -97,14 +134,48 @@ def index(self): def query(self): pass + def add_frontend(self, *args, **kwargs) -> 'Flow': + """Add a frontend to the current flow, a shortcut of add(Service.Frontend) + Usually you dont need to call this function explicitly, a flow object contains a frontend service by default. + This function is useful when you build a flow without the frontend and want to customize the frontend later. + """ + return self.add(Service.Frontend, *args, **kwargs) + + def add_encoder(self, *args, **kwargs) -> 'Flow': + """Add an encoder to the current flow, a shortcut of add(Service.Encoder)""" + return self.add(Service.Encoder, *args, **kwargs) + + def add_indexer(self, *args, **kwargs) -> 'Flow': + """Add an indexer to the current flow, a shortcut of add(Service.Indexer)""" + return self.add(Service.Indexer, *args, **kwargs) + + def add_preprocessor(self, *args, **kwargs) -> 'Flow': + """Add a router to the current flow, a shortcut of add(Service.Preprocessor)""" + return self.add(Service.Preprocessor, *args, **kwargs) + + def add_router(self, *args, **kwargs) -> 'Flow': + """Add a preprocessor to the current flow, a shortcut of add(Service.Router)""" + return self.add(Service.Router, *args, **kwargs) + def add(self, service: 'Service', name: str = None, service_in: Union[str, Tuple[str], List[str], 'Service'] = None, service_out: Union[str, Tuple[str], List[str], 'Service'] = None, **kwargs) -> 'Flow': - - if service not in Flow._supported_service: - raise ValueError('service: %s is not supported, should be one of %s' % (service, self._supported_service)) + """ + Add a service to the current flow object + :param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend + :param name: the name indentifier of the service, useful in 'service_in' and 'service_out' + :param service_in: the name of the service(s) that this service receives data from. + One can also use 'Service.Frontend' to indicate the connection with the frontend. + :param service_out: the name of the service(s) that this service sends data to. + One can also use 'Service.Frontend' to indicate the connection with the frontend. + :param kwargs: other keyword-value arguments that the service CLI supports + :return: the current flow object + """ + + if service not in Flow._service2parser: + raise ValueError('service: %s is not supported, should be one of %s' % (service, self._service2parser)) if name in self._service_nodes: raise ValueError('name: %s is used in this Flow already!' % name) @@ -119,44 +190,50 @@ def add(self, service: 'Service', raise ValueError('frontend is already in this Flow') self._frontend = name + service_in = self._parse_service_endpoints(name, service_in, connect_to_last_service=True) + service_out = self._parse_service_endpoints(name, service_out, connect_to_last_service=False) + + args, p_args = self._get_parsed_args(service, kwargs) + + self._service_nodes[name] = { + 'service': service, + 'parsed_args': p_args, + 'args': args, + 'incomes': service_in, + 'outgoings': service_out} + + # direct all income services' output to the current service + for s in service_in: + self._service_nodes[s]['outgoings'].add(name) + for s in service_out: + self._service_nodes[s]['incomes'].add(name) + + self._last_add_service = name + + return self + + def _parse_service_endpoints(self, cur_service_name, service_endpoint, connect_to_last_service=False): # parsing service_in - if isinstance(service_in, str): - service_in = [service_in] - elif service_in == Service.Frontend: - service_in = [self._frontend] - elif not service_in: - if self._last_add_service: - service_in = [self._last_add_service] + if isinstance(service_endpoint, str): + service_endpoint = [service_endpoint] + elif service_endpoint == Service.Frontend: + service_endpoint = [self._frontend] + elif not service_endpoint: + if self._last_add_service and connect_to_last_service: + service_endpoint = [self._last_add_service] else: - service_in = [] - - if isinstance(service_in, list) or isinstance(service_in, tuple): - for s in service_in: - if s == name: + service_endpoint = [] + if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple): + for s in service_endpoint: + if s == cur_service_name: raise ValueError('the income of a service can not be itself') if s not in self._service_nodes: raise ValueError('service_in: %s can not be found in this Flow' % s) else: - raise ValueError('service_in=%s is not parsable' % service_in) - - # parsing service_out - if isinstance(service_out, str): - service_out = [service_out] - elif service_out == Service.Frontend: - service_out = [self._frontend] - elif not service_out: - service_out = [] - - if isinstance(service_out, list) or isinstance(service_out, tuple): - for s in service_out: - if s == name: - raise ValueError('the outcome of a service can not be itself') - if s not in self._service_nodes: - raise ValueError( - 'service_out: %s can not be found in this Flow yet, maybe you need to add it first?' % s) - else: - raise ValueError('service_out=%s is not parsable' % service_out) + raise ValueError('service_in=%s is not parsable' % service_endpoint) + return set(service_endpoint) + def _get_parsed_args(self, service, kwargs): kwargs.update(self._common_kwargs) args = [] for k, v in kwargs.items(): @@ -173,38 +250,18 @@ def add(self, service: 'Service', args.append('--no_%s' % k) else: args.extend(['--%s' % k, str(v)]) - try: - p_args, unknown_args = Flow._supported_service[service].parse_known_args(args) + p_args, unknown_args = Flow._service2parser[service].parse_known_args(args) if unknown_args: self.logger.warning('not sure what these arguments are: %s' % unknown_args) except SystemExit: raise ValueError('bad arguments for service "%s", ' - 'you may want to recheck your args "%s"' % (service, args)) - - service_in = set(service_in) - service_out = set(service_out) + 'you may want to double check your args "%s"' % (service, args)) + return args, p_args - self._service_nodes[name] = { - 'service': service, - 'parsed_args': p_args, - 'args': args, - 'incomes': service_in, - 'outgoings': service_out} - - # direct all income services' output to the current service - for s in service_in: - self._service_nodes[s]['outgoings'].add(name) - for s in service_out: - self._service_nodes[s]['incomes'].add(name) - - self._last_add_service = name - - return self - - def build(self): + def _build_graph(self) -> 'Flow': if not self._frontend: - raise ValueError('frontend do not exist') + raise ValueError('frontend does not exist, you may need to add_frontend()') if not self._last_add_service or not self._service_nodes: raise ValueError('flow is empty?') @@ -276,5 +333,36 @@ def build(self): 'this type of topology is ambiguous and should not exist, i can not determine the socket type' % ( len(edges_with_same_start), start_node, len(edges_with_same_end), end_node)) - self._is_built = True + self._build_level = Flow.BuildLevel.GRAPH + return self + + def build(self, backend='thread', *args, **kwargs) -> 'Flow': + self._build_graph() + if backend in {'thread', 'process'}: + self._service_contexts.clear() + for v in self._service_nodes.values(): + v['parsed_args'].parallel_backend = backend + s = self._service2builder[v['service']](v['parsed_args']) + self._service_contexts.append(s) + else: + raise NotImplementedError('backend=%s is not supported yet' % backend) + self._build_level = Flow.BuildLevel.RUNTIME return self + + def __call__(self, *args, **kwargs): + return self.build(*args, **kwargs) + + def __enter__(self): + if self._build_level.value < Flow.BuildLevel.RUNTIME.value: + self.logger.warning( + 'current build_level=%s, lower than required. ' + 'build the flow now via build() with default parameters' % self._build_level) + self.build() + self._service_stack = ExitStack() + for k in self._service_contexts: + self._service_stack.enter_context(k) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if hasattr(self, '_service_stack'): + self._service_stack.close() diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 215f85ef..b74c2727 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -38,14 +38,23 @@ def __init__(self, args): self.bind_address = '{0}:{1}'.format(args.grpc_host, args.grpc_port) self.server.add_insecure_port(self.bind_address) + self._stop_event = threading.Event() def __enter__(self): self.server.start() self.logger.critical('listening at: %s' % self.bind_address) + self._stop_event.clear() return self def __exit__(self, exc_type, exc_val, exc_tb): self.server.stop(None) + self.stop() + + def stop(self): + self._stop_event.set() + + def join(self): + self._stop_event.wait() class _Servicer(gnes_pb2_grpc.GnesRPCServicer): diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index e1b0e4aa..10180bf7 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -11,6 +11,12 @@ def test_flow1(self): print(f._service_edges) print(f.to_mermaid()) + def test_flow1_ctx(self): + f = (Flow(check_version=False, route_table=True) + .add(gfs.Router, yaml_path='BaseRouter')) + with f(backend='process'): + pass + def test_flow2(self): f = (Flow(check_version=False, route_table=True) .add(gfs.Router, yaml_path='BaseRouter') diff --git a/tests/test_service_mgr.py b/tests/test_service_mgr.py index 0bd40ad0..224233ae 100644 --- a/tests/test_service_mgr.py +++ b/tests/test_service_mgr.py @@ -19,6 +19,18 @@ def setUp(self): os.unsetenv('http_proxy') os.unsetenv('https_proxy') + def test_frontend_alone(self): + args = set_frontend_parser().parse_args([ + '--grpc_host', '127.0.0.1', + + ]) + + with FrontendService(args): + pass + + with ServiceManager(FrontendService, args): + pass + def _test_multiple_router(self, backend='thread', num_parallel=5): a = set_router_parser().parse_args([ '--yaml_path', 'BaseRouter',