Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
feat(flow): add context manager to flow
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Oct 8, 2019
1 parent ae0d405 commit 43b9d01
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 78 deletions.
5 changes: 1 addition & 4 deletions gnes/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ def route(args):

def frontend(args):
from ..service.frontend import FrontendService
import threading
with FrontendService(args):
forever = threading.Event()
forever.wait()
_start_service(FrontendService, args)


def client(args):
Expand Down
236 changes: 162 additions & 74 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from collections import OrderedDict, defaultdict
from contextlib import ExitStack
from functools import wraps
from typing import Union, Tuple, List

from ..cli.parser import set_router_parser, set_indexer_parser, \
set_frontend_parser, set_preprocessor_parser, \
set_encoder_parser
from ..helper import set_logger
from ..service.base import SocketType, BaseService, BetterEnum
from ..service.base import SocketType, BaseService, BetterEnum, ServiceManager
from ..service.encoder import EncoderService
from ..service.frontend import FrontendService
from ..service.indexer import IndexerService
from ..service.preprocessor import PreprocessorService
from ..service.router import RouterService


class Service(BetterEnum):
Expand All @@ -16,47 +23,77 @@ class Service(BetterEnum):
Preprocessor = 4


def _build_level(required_level: 'Flow.BuildLevel'):
def __build_level(func):
@wraps(func)
def arg_wrapper(self, *args, **kwargs):
if hasattr(self, '_build_level'):
if self._build_level.value >= required_level.value:
return func(self, *args, **kwargs)
else:
raise ValueError('build_level check failed for %s, required: %s, actual: %s' % (
func.__name__, required_level, self._build_level))
else:
raise AttributeError('%r has no attribute "_build_level"' % self)

return arg_wrapper

return __build_level


class Flow:
_supported_orch = {'swarm', 'k8s'}
_supported_service = {
_service2parser = {
Service.Encoder: set_encoder_parser(),
Service.Router: set_router_parser(),
Service.Indexer: set_indexer_parser(),
Service.Frontend: set_frontend_parser(),
Service.Preprocessor: set_preprocessor_parser()
Service.Preprocessor: set_preprocessor_parser(),
}
_service2builder = {
Service.Encoder: lambda x: ServiceManager(EncoderService, x),
Service.Router: lambda x: ServiceManager(RouterService, x),
Service.Indexer: lambda x: ServiceManager(IndexerService, x),
Service.Preprocessor: lambda x: ServiceManager(PreprocessorService, x),
Service.Frontend: FrontendService,
}

class BuildLevel(BetterEnum):
EMPTY = 0
GRAPH = 1
RUNTIME = 2

def __init__(self, with_frontend: bool = True, **kwargs):
self.logger = set_logger(self.__class__.__name__)
self._service_nodes = OrderedDict()
self._service_edges = {}
self._service_name_counter = {k: 0 for k in self._supported_service.keys()}
self._service_name_counter = {k: 0 for k in self._service2parser.keys()}
self._service_contexts = []
self._last_add_service = None
self._common_kwargs = kwargs
self._frontend = None
self._is_built = False
self._client = None
self._build_level = Flow.BuildLevel.EMPTY
self._backend = None
if with_frontend:
self.add(Service.Frontend)
self.add_frontend()
else:
self.logger.warning('with_frontend is set to False, you need to add frontend service by yourself')
self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself')

@_build_level(BuildLevel.GRAPH)
def to_yaml(self, orchestration: str) -> str:
if orchestration not in Flow._supported_orch:
raise TypeError(
'%s is not valid type of orchestration, should be one of %s' % (orchestration, self._supported_orch))
'%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch))

@staticmethod
def from_yaml(orchestration: str) -> 'Flow':
if orchestration not in Flow._supported_orch:
raise TypeError(
'%s is not valid type of orchestration, should be one of %s' % (orchestration, self._supported_orch))

def _check_is_built(self):
if not self._is_built:
raise ValueError('this flow is not built yet, please call build() first')
'%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch))

@_build_level(BuildLevel.GRAPH)
def to_mermaid(self, left_right: bool = True):
self._check_is_built()
mermaid_graph = OrderedDict()
for k in self._service_nodes.keys():
mermaid_graph[k] = []
Expand Down Expand Up @@ -97,14 +134,48 @@ def index(self):
def query(self):
pass

def add_frontend(self, *args, **kwargs) -> 'Flow':
"""Add a frontend to the current flow, a shortcut of add(Service.Frontend)
Usually you dont need to call this function explicitly, a flow object contains a frontend service by default.
This function is useful when you build a flow without the frontend and want to customize the frontend later.
"""
return self.add(Service.Frontend, *args, **kwargs)

def add_encoder(self, *args, **kwargs) -> 'Flow':
"""Add an encoder to the current flow, a shortcut of add(Service.Encoder)"""
return self.add(Service.Encoder, *args, **kwargs)

def add_indexer(self, *args, **kwargs) -> 'Flow':
"""Add an indexer to the current flow, a shortcut of add(Service.Indexer)"""
return self.add(Service.Indexer, *args, **kwargs)

def add_preprocessor(self, *args, **kwargs) -> 'Flow':
"""Add a router to the current flow, a shortcut of add(Service.Preprocessor)"""
return self.add(Service.Preprocessor, *args, **kwargs)

def add_router(self, *args, **kwargs) -> 'Flow':
"""Add a preprocessor to the current flow, a shortcut of add(Service.Router)"""
return self.add(Service.Router, *args, **kwargs)

def add(self, service: 'Service',
name: str = None,
service_in: Union[str, Tuple[str], List[str], 'Service'] = None,
service_out: Union[str, Tuple[str], List[str], 'Service'] = None,
**kwargs) -> 'Flow':

if service not in Flow._supported_service:
raise ValueError('service: %s is not supported, should be one of %s' % (service, self._supported_service))
"""
Add a service to the current flow object
:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
:param service_in: the name of the service(s) that this service receives data from.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param service_out: the name of the service(s) that this service sends data to.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param kwargs: other keyword-value arguments that the service CLI supports
:return: the current flow object
"""

if service not in Flow._service2parser:
raise ValueError('service: %s is not supported, should be one of %s' % (service, self._service2parser))

if name in self._service_nodes:
raise ValueError('name: %s is used in this Flow already!' % name)
Expand All @@ -119,44 +190,50 @@ def add(self, service: 'Service',
raise ValueError('frontend is already in this Flow')
self._frontend = name

service_in = self._parse_service_endpoints(name, service_in, connect_to_last_service=True)
service_out = self._parse_service_endpoints(name, service_out, connect_to_last_service=False)

args, p_args = self._get_parsed_args(service, kwargs)

self._service_nodes[name] = {
'service': service,
'parsed_args': p_args,
'args': args,
'incomes': service_in,
'outgoings': service_out}

# direct all income services' output to the current service
for s in service_in:
self._service_nodes[s]['outgoings'].add(name)
for s in service_out:
self._service_nodes[s]['incomes'].add(name)

self._last_add_service = name

return self

def _parse_service_endpoints(self, cur_service_name, service_endpoint, connect_to_last_service=False):
# parsing service_in
if isinstance(service_in, str):
service_in = [service_in]
elif service_in == Service.Frontend:
service_in = [self._frontend]
elif not service_in:
if self._last_add_service:
service_in = [self._last_add_service]
if isinstance(service_endpoint, str):
service_endpoint = [service_endpoint]
elif service_endpoint == Service.Frontend:
service_endpoint = [self._frontend]
elif not service_endpoint:
if self._last_add_service and connect_to_last_service:
service_endpoint = [self._last_add_service]
else:
service_in = []

if isinstance(service_in, list) or isinstance(service_in, tuple):
for s in service_in:
if s == name:
service_endpoint = []
if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple):
for s in service_endpoint:
if s == cur_service_name:
raise ValueError('the income of a service can not be itself')
if s not in self._service_nodes:
raise ValueError('service_in: %s can not be found in this Flow' % s)
else:
raise ValueError('service_in=%s is not parsable' % service_in)

# parsing service_out
if isinstance(service_out, str):
service_out = [service_out]
elif service_out == Service.Frontend:
service_out = [self._frontend]
elif not service_out:
service_out = []

if isinstance(service_out, list) or isinstance(service_out, tuple):
for s in service_out:
if s == name:
raise ValueError('the outcome of a service can not be itself')
if s not in self._service_nodes:
raise ValueError(
'service_out: %s can not be found in this Flow yet, maybe you need to add it first?' % s)
else:
raise ValueError('service_out=%s is not parsable' % service_out)
raise ValueError('service_in=%s is not parsable' % service_endpoint)
return set(service_endpoint)

def _get_parsed_args(self, service, kwargs):
kwargs.update(self._common_kwargs)
args = []
for k, v in kwargs.items():
Expand All @@ -173,38 +250,18 @@ def add(self, service: 'Service',
args.append('--no_%s' % k)
else:
args.extend(['--%s' % k, str(v)])

try:
p_args, unknown_args = Flow._supported_service[service].parse_known_args(args)
p_args, unknown_args = Flow._service2parser[service].parse_known_args(args)
if unknown_args:
self.logger.warning('not sure what these arguments are: %s' % unknown_args)
except SystemExit:
raise ValueError('bad arguments for service "%s", '
'you may want to recheck your args "%s"' % (service, args))

service_in = set(service_in)
service_out = set(service_out)
'you may want to double check your args "%s"' % (service, args))
return args, p_args

self._service_nodes[name] = {
'service': service,
'parsed_args': p_args,
'args': args,
'incomes': service_in,
'outgoings': service_out}

# direct all income services' output to the current service
for s in service_in:
self._service_nodes[s]['outgoings'].add(name)
for s in service_out:
self._service_nodes[s]['incomes'].add(name)

self._last_add_service = name

return self

def build(self):
def _build_graph(self) -> 'Flow':
if not self._frontend:
raise ValueError('frontend do not exist')
raise ValueError('frontend does not exist, you may need to add_frontend()')

if not self._last_add_service or not self._service_nodes:
raise ValueError('flow is empty?')
Expand Down Expand Up @@ -276,5 +333,36 @@ def build(self):
'this type of topology is ambiguous and should not exist, i can not determine the socket type' % (
len(edges_with_same_start), start_node, len(edges_with_same_end), end_node))

self._is_built = True
self._build_level = Flow.BuildLevel.GRAPH
return self

def build(self, backend='thread', *args, **kwargs) -> 'Flow':
self._build_graph()
if backend in {'thread', 'process'}:
self._service_contexts.clear()
for v in self._service_nodes.values():
v['parsed_args'].parallel_backend = backend
s = self._service2builder[v['service']](v['parsed_args'])
self._service_contexts.append(s)
else:
raise NotImplementedError('backend=%s is not supported yet' % backend)
self._build_level = Flow.BuildLevel.RUNTIME
return self

def __call__(self, *args, **kwargs):
return self.build(*args, **kwargs)

def __enter__(self):
if self._build_level.value < Flow.BuildLevel.RUNTIME.value:
self.logger.warning(
'current build_level=%s, lower than required. '
'build the flow now via build() with default parameters' % self._build_level)
self.build()
self._service_stack = ExitStack()
for k in self._service_contexts:
self._service_stack.enter_context(k)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, '_service_stack'):
self._service_stack.close()
9 changes: 9 additions & 0 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,23 @@ def __init__(self, args):

self.bind_address = '{0}:{1}'.format(args.grpc_host, args.grpc_port)
self.server.add_insecure_port(self.bind_address)
self._stop_event = threading.Event()

def __enter__(self):
self.server.start()
self.logger.critical('listening at: %s' % self.bind_address)
self._stop_event.clear()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.server.stop(None)
self.stop()

def stop(self):
self._stop_event.set()

def join(self):
self._stop_event.wait()

class _Servicer(gnes_pb2_grpc.GnesRPCServicer):

Expand Down
6 changes: 6 additions & 0 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ def test_flow1(self):
print(f._service_edges)
print(f.to_mermaid())

def test_flow1_ctx(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter'))
with f(backend='process'):
pass

def test_flow2(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
Expand Down
12 changes: 12 additions & 0 deletions tests/test_service_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ def setUp(self):
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')

def test_frontend_alone(self):
args = set_frontend_parser().parse_args([
'--grpc_host', '127.0.0.1',

])

with FrontendService(args):
pass

with ServiceManager(FrontendService, args):
pass

def _test_multiple_router(self, backend='thread', num_parallel=5):
a = set_router_parser().parse_args([
'--yaml_path', 'BaseRouter',
Expand Down

0 comments on commit 43b9d01

Please sign in to comment.