From 14f5280375d6be221333f77b24d6166f27574c4f Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Wed, 9 Oct 2019 17:25:04 +0800 Subject: [PATCH] fix(parser): use str instead of textio stream to prevent serializer err --- gnes/cli/api.py | 2 +- gnes/cli/parser.py | 2 +- gnes/encoder/__init__.py | 3 +- gnes/flow/__init__.py | 197 ++++++++++++++++++++++++--------------- tests/test_gnes_flow.py | 27 +++++- 5 files changed, 149 insertions(+), 82 deletions(-) diff --git a/gnes/cli/api.py b/gnes/cli/api.py index 4a25a934..376e24dd 100644 --- a/gnes/cli/api.py +++ b/gnes/cli/api.py @@ -86,7 +86,7 @@ def _client_http(args): def _client_cli(args): from ..client.cli import CLIClient - CLIClient(args).start() + CLIClient(args) def compose(args): diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index f67b9ca2..f19e0d53 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -64,7 +64,7 @@ def resolve_yaml_path(path): # already a readable stream return path elif os.path.exists(path): - return open(path, encoding='utf8') + return path elif path.isidentifier(): # possible class name return io.StringIO('!%s {}' % path) diff --git a/gnes/encoder/__init__.py b/gnes/encoder/__init__.py index 11e7de52..84be4e7c 100644 --- a/gnes/encoder/__init__.py +++ b/gnes/encoder/__init__.py @@ -45,7 +45,8 @@ 'VggishEncoder': 'audio.vggish', 'YouTube8MFeatureExtractor': 'video.yt8m_feature_extractor', 'YouTube8MEncoder': 'video.yt8m_model', - 'QuantizerEncoder': 'numeric.quantizer' + 'QuantizerEncoder': 'numeric.quantizer', + 'CharEmbeddingEncoder': 'text.char' } register_all_class(_cls2file_map, 'encoder') diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 95c2de55..1549f865 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -1,3 +1,4 @@ +import copy from collections import OrderedDict, defaultdict from contextlib import ExitStack from functools import wraps @@ -24,6 +25,18 @@ class Service(BetterEnum): Preprocessor = 4 +class FlowImcompleteError(ValueError): + """Exception when the flow missing some important component to run""" + + +class FlowTopologyError(ValueError): + """Exception when the topology is ambiguous""" + + +class FlowBuildLevelMismatch(ValueError): + """Exception when required level is higher than the current build level""" + + def _build_level(required_level: 'Flow.BuildLevel'): def __build_level(func): @wraps(func) @@ -32,8 +45,9 @@ def arg_wrapper(self, *args, **kwargs): if self._build_level.value >= required_level.value: return func(self, *args, **kwargs) else: - raise ValueError('build_level check failed for %s, required: %s, actual: %s' % ( - func.__name__, required_level, self._build_level)) + raise FlowBuildLevelMismatch( + 'build_level check failed for %r, required level: %s, actual level: %s' % ( + func, required_level, self._build_level)) else: raise AttributeError('%r has no attribute "_build_level"' % self) @@ -68,7 +82,7 @@ def __init__(self, with_frontend: bool = True, **kwargs): self.logger = set_logger(self.__class__.__name__) self._service_nodes = OrderedDict() self._service_edges = {} - self._service_name_counter = {k: 0 for k in self._service2parser.keys()} + self._service_name_counter = {k: 0 for k in Flow._service2parser.keys()} self._service_contexts = [] self._last_add_service = None self._common_kwargs = kwargs @@ -77,7 +91,7 @@ def __init__(self, with_frontend: bool = True, **kwargs): self._build_level = Flow.BuildLevel.EMPTY self._backend = None if with_frontend: - self.add_frontend() + self.add_frontend(copy_flow=False) else: self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself') @@ -137,7 +151,7 @@ def query(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs): @_build_level(BuildLevel.RUNTIME) def _call_client(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs): - args, p_args = self._get_parsed_args(set_client_cli_parser, kwargs) + args, p_args = self._get_parsed_args(self, set_client_cli_parser, kwargs) p_args.grpc_port = self._service_nodes[self._frontend]['parsed_args'].grpc_port p_args.grpc_host = self._service_nodes[self._frontend]['parsed_args'].grpc_host c = CLIClient(p_args, start_at_init=False) @@ -172,9 +186,12 @@ def add(self, service: 'Service', name: str = None, service_in: Union[str, Tuple[str], List[str], 'Service'] = None, service_out: Union[str, Tuple[str], List[str], 'Service'] = None, + copy_flow: bool = True, **kwargs) -> 'Flow': """ - Add a service to the current flow object + Add a service to the current flow object and return the new modified flow object + :param copy_flow: when set to true, then always copy the current flow + and do the modification on top of it then return, otherwise, do in-line modification :param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend :param name: the name indentifier of the service, useful in 'service_in' and 'service_out' :param service_in: the name of the service(s) that this service receives data from. @@ -182,31 +199,33 @@ def add(self, service: 'Service', :param service_out: the name of the service(s) that this service sends data to. One can also use 'Service.Frontend' to indicate the connection with the frontend. :param kwargs: other keyword-value arguments that the service CLI supports - :return: the current flow object + :return: a (new) flow object with modification """ + op_flow = copy.deepcopy(self) if copy_flow else self + if service not in Flow._service2parser: - raise ValueError('service: %s is not supported, should be one of %s' % (service, self._service2parser)) + raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service2parser)) - if name in self._service_nodes: - raise ValueError('name: %s is used in this Flow already!' % name) + if name in op_flow._service_nodes: + raise FlowTopologyError('name: %s is used in this Flow already!' % name) if not name: - name = '%s%d' % (service, self._service_name_counter[service]) - self._service_name_counter[service] += 1 + name = '%s%d' % (service, op_flow._service_name_counter[service]) + op_flow._service_name_counter[service] += 1 if not name.isidentifier(): raise ValueError('name: %s is invalid, please follow the python variable name conventions' % name) if service == Service.Frontend: - if self._frontend: - raise ValueError('frontend is already in this Flow') - self._frontend = name + if op_flow._frontend: + raise FlowTopologyError('frontend is already in this Flow') + op_flow._frontend = name - service_in = self._parse_service_endpoints(name, service_in, connect_to_last_service=True) - service_out = self._parse_service_endpoints(name, service_out, connect_to_last_service=False) + service_in = op_flow._parse_service_endpoints(op_flow, name, service_in, connect_to_last_service=True) + service_out = op_flow._parse_service_endpoints(op_flow, name, service_out, connect_to_last_service=False) - args, p_args = self._get_parsed_args(Flow._service2parser[service], kwargs) + args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs) - self._service_nodes[name] = { + op_flow._service_nodes[name] = { 'service': service, 'parsed_args': p_args, 'args': args, @@ -215,41 +234,43 @@ def add(self, service: 'Service', # direct all income services' output to the current service for s in service_in: - self._service_nodes[s]['outgoings'].add(name) + op_flow._service_nodes[s]['outgoings'].add(name) for s in service_out: - self._service_nodes[s]['incomes'].add(name) + op_flow._service_nodes[s]['incomes'].add(name) - self._last_add_service = name + op_flow._last_add_service = name # graph is now changed so we need to # reset the build level to the lowest - self._build_level = Flow.BuildLevel.EMPTY + op_flow._build_level = Flow.BuildLevel.EMPTY - return self + return op_flow - def _parse_service_endpoints(self, cur_service_name, service_endpoint, connect_to_last_service=False): + @staticmethod + def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False): # parsing service_in if isinstance(service_endpoint, str): service_endpoint = [service_endpoint] elif service_endpoint == Service.Frontend: - service_endpoint = [self._frontend] + service_endpoint = [op_flow._frontend] elif not service_endpoint: - if self._last_add_service and connect_to_last_service: - service_endpoint = [self._last_add_service] + if op_flow._last_add_service and connect_to_last_service: + service_endpoint = [op_flow._last_add_service] else: service_endpoint = [] if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple): for s in service_endpoint: if s == cur_service_name: - raise ValueError('the income of a service can not be itself') - if s not in self._service_nodes: - raise ValueError('service_in: %s can not be found in this Flow' % s) + raise FlowTopologyError('the income of a service can not be itself') + if s not in op_flow._service_nodes: + raise FlowTopologyError('service_in: %s can not be found in this Flow' % s) else: raise ValueError('service_in=%s is not parsable' % service_endpoint) return set(service_endpoint) - def _get_parsed_args(self, service_arg_parser, kwargs): - kwargs.update(self._common_kwargs) + @staticmethod + def _get_parsed_args(op_flow, service_arg_parser, kwargs): + kwargs.update(op_flow._common_kwargs) args = [] for k, v in kwargs.items(): if isinstance(v, bool): @@ -268,36 +289,40 @@ def _get_parsed_args(self, service_arg_parser, kwargs): try: p_args, unknown_args = service_arg_parser().parse_known_args(args) if unknown_args: - self.logger.warning('not sure what these arguments are: %s' % unknown_args) + op_flow.logger.warning('not sure what these arguments are: %s' % unknown_args) except SystemExit: raise ValueError('bad arguments for service "%s", ' 'you may want to double check your args "%s"' % (service_arg_parser, args)) return args, p_args - def _build_graph(self) -> 'Flow': - if not self._frontend: - raise ValueError('frontend does not exist, you may need to add_frontend()') + def _build_graph(self, copy_flow: bool) -> 'Flow': + op_flow = copy.deepcopy(self) if copy_flow else self + + op_flow._service_edges.clear() + + if not op_flow._frontend: + raise FlowImcompleteError('frontend does not exist, you may need to add_frontend()') - if not self._last_add_service or not self._service_nodes: - raise ValueError('flow is empty?') + if not op_flow._last_add_service or not op_flow._service_nodes: + raise FlowTopologyError('flow is empty?') # close the loop - self._service_nodes[self._frontend]['incomes'].add(self._last_add_service) + op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_add_service) # build all edges - for k, v in self._service_nodes.items(): + for k, v in op_flow._service_nodes.items(): for s in v['incomes']: - self._service_edges['%s-%s' % (s, k)] = '' + op_flow._service_edges['%s-%s' % (s, k)] = '' for t in v['outgoings']: - self._service_edges['%s-%s' % (k, t)] = '' + op_flow._service_edges['%s-%s' % (k, t)] = '' - for k in self._service_edges.keys(): + for k in op_flow._service_edges.keys(): start_node, end_node = k.split('-') - edges_with_same_start = [ed for ed in self._service_edges.keys() if ed.startswith(start_node)] - edges_with_same_end = [ed for ed in self._service_edges.keys() if ed.endswith(end_node)] + edges_with_same_start = [ed for ed in op_flow._service_edges.keys() if ed.startswith(start_node)] + edges_with_same_end = [ed for ed in op_flow._service_edges.keys() if ed.endswith(end_node)] - s_pargs = self._service_nodes[start_node]['parsed_args'] - e_pargs = self._service_nodes[end_node]['parsed_args'] + s_pargs = op_flow._service_nodes[start_node]['parsed_args'] + e_pargs = op_flow._service_nodes[end_node]['parsed_args'] # Rule # if a node has multiple income/outgoing services, @@ -314,21 +339,21 @@ def _build_graph(self) -> 'Flow': s_pargs.host_out = BaseService.default_host e_pargs.socket_in = SocketType.SUB_CONNECT e_pargs.host_in = start_node - self._service_edges[k] = 'PUB-sub' + op_flow._service_edges[k] = 'PUB-sub' elif len(edges_with_same_end) > 1 and len(edges_with_same_start) == 1: s_pargs.socket_out = SocketType.PUSH_CONNECT s_pargs.host_out = end_node e_pargs.socket_in = SocketType.PULL_BIND e_pargs.host_in = BaseService.default_host - self._service_edges[k] = 'push-PULL' + op_flow._service_edges[k] = 'push-PULL' elif len(edges_with_same_start) == 1 and len(edges_with_same_end) == 1: # in this case, either side can be BIND # we prefer frontend to be always BIND # check if either node is frontend - if start_node == self._frontend: + if start_node == op_flow._frontend: s_pargs.socket_out = SocketType.PUSH_BIND e_pargs.socket_in = SocketType.PULL_CONNECT - elif end_node == self._frontend: + elif end_node == op_flow._frontend: s_pargs.socket_out = SocketType.PUSH_CONNECT e_pargs.socket_in = SocketType.PULL_BIND else: @@ -337,41 +362,49 @@ def _build_graph(self) -> 'Flow': if s_pargs.socket_out.is_bind: s_pargs.host_out = BaseService.default_host e_pargs.host_in = start_node - self._service_edges[k] = 'PUSH-pull' + op_flow._service_edges[k] = 'PUSH-pull' elif e_pargs.socket_in.is_bind: s_pargs.host_out = end_node e_pargs.host_in = BaseService.default_host - self._service_edges[k] = 'push-PULL' + op_flow._service_edges[k] = 'push-PULL' else: - raise ValueError('edge %s -> %s is ambiguous, at least one socket should be BIND') + raise FlowTopologyError('edge %s -> %s is ambiguous, at least one socket should be BIND') else: - raise ValueError('found %d edges start with %s and %d edges end with %s, ' - 'this type of topology is ambiguous and should not exist, i can not determine the socket type' % ( - len(edges_with_same_start), start_node, len(edges_with_same_end), end_node)) + raise FlowTopologyError('found %d edges start with %s and %d edges end with %s, ' + 'this type of topology is ambiguous and should not exist, ' + 'i can not determine the socket type' % ( + len(edges_with_same_start), start_node, len(edges_with_same_end), end_node)) - self._build_level = Flow.BuildLevel.GRAPH - return self + op_flow._build_level = Flow.BuildLevel.GRAPH + return op_flow + + def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *args, **kwargs) -> 'Flow': + """ + Build the current flow and make it ready to use + :param backend: supported 'thread', 'process', 'swarm', 'k8s', 'shell' + :param copy_flow: return the copy of the current flow + :return: the current flow (by default) + """ + + op_flow = self._build_graph(copy_flow) - def build(self, backend: Optional[str] = 'thread', *args, **kwargs) -> 'Flow': - self._build_graph() if not backend: - self.logger.warning('no specified backend, build_level stays at %s, ' - 'and you can not run this flow.' % self._build_level) + op_flow.logger.warning('no specified backend, build_level stays at %s, ' + 'and you can not run this flow.' % op_flow._build_level) elif backend in {'thread', 'process'}: - self._service_contexts.clear() - for v in self._service_nodes.values(): + op_flow._service_contexts.clear() + for v in op_flow._service_nodes.values(): p_args = v['parsed_args'] p_args.parallel_backend = backend # for thread and process backend which runs locally, host_in and host_out should not be set p_args.host_in = BaseService.default_host p_args.host_out = BaseService.default_host - s = self._service2builder[v['service']](p_args) - self._service_contexts.append(s) - self._build_level = Flow.BuildLevel.RUNTIME + op_flow._service_contexts.append((Flow._service2builder[v['service']], p_args)) + op_flow._build_level = Flow.BuildLevel.RUNTIME else: raise NotImplementedError('backend=%s is not supported yet' % backend) - return self + return op_flow def __call__(self, *args, **kwargs): return self.build(*args, **kwargs) @@ -381,13 +414,29 @@ def __enter__(self): self.logger.warning( 'current build_level=%s, lower than required. ' 'build the flow now via build() with default parameters' % self._build_level) - self.build() + self.build(copy_flow=False) self._service_stack = ExitStack() - for k in self._service_contexts: - self._service_stack.enter_context(k) + for k, v in self._service_contexts: + self._service_stack.enter_context(k(v)) + + self.logger.critical('flow is built and ready, current build level is %s' % self._build_level) return self def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): if hasattr(self, '_service_stack'): self._service_stack.close() - self._build_level = Flow.BuildLevel.GRAPH + self._build_level = Flow.BuildLevel.EMPTY + self.logger.critical( + 'flow is closed and all resources should be released already, current build level is %s' % self._build_level) + + def __getstate__(self): + d = dict(self.__dict__) + del d['logger'] + return d + + def __setstate__(self, d): + self.__dict__.update(d) + self.logger = set_logger(self.__class__.__name__) diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 6fa9dcaf..a76ed421 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -2,7 +2,7 @@ import unittest from gnes.cli.parser import set_client_cli_parser -from gnes.flow import Flow, Service as gfs +from gnes.flow import Flow, Service as gfs, FlowBuildLevelMismatch, FlowTopologyError class TestGNESFlow(unittest.TestCase): @@ -36,9 +36,20 @@ def tearDown(self): def test_flow1(self): f = (Flow(check_version=False, route_table=True) - .add(gfs.Router, yaml_path='BaseRouter').build()) - print(f._service_edges) + .add(gfs.Router, yaml_path='BaseRouter')) + g = f.add(gfs.Router, yaml_path='BaseRouter') + + print('f: %r g: %r' % (f, g)) + g.build() + print(g.to_mermaid()) + + f = f.add(gfs.Router, yaml_path='BaseRouter') + g = g.add(gfs.Router, yaml_path='BaseRouter') + + print('f: %r g: %r' % (f, g)) + f.build() print(f.to_mermaid()) + self.assertRaises(FlowTopologyError, g.build) def test_flow1_ctx_empty(self): f = (Flow(check_version=False, route_table=True) @@ -49,11 +60,17 @@ def test_flow1_ctx_empty(self): def test_flow1_ctx(self): flow = (Flow(check_version=False, route_table=False) .add(gfs.Router, yaml_path='BaseRouter')) - with flow(backend='process') as f, open(self.test_file) as fp: + with flow(backend='process', copy_flow=True) as f, open(self.test_file) as fp: f.index(txt_file=self.test_file, batch_size=4) - f.index(bytes_gen=(v.encode() for v in fp), batch_size=4) f.train(txt_file=self.test_file, batch_size=4) + with flow(backend='process', copy_flow=True) as f: + # change the flow inside build shall fail + f = f.add(gfs.Router, yaml_path='BaseRouter') + self.assertRaises(FlowBuildLevelMismatch, f.index, txt_file=self.test_file, batch_size=4) + + print(flow.build(backend=None).to_mermaid()) + def test_flow2(self): f = (Flow(check_version=False, route_table=True) .add(gfs.Router, yaml_path='BaseRouter')