From 7d2c681ec42d1026b8a37c0a26e744e5ae8a7863 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Sat, 12 Oct 2019 16:39:43 +0800 Subject: [PATCH 1/6] fix(flow): add warning to jpg downloader --- gnes/flow/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 5aa6653a..6572feb6 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -238,7 +238,7 @@ def to_url(self, **kwargs) -> str: return 'https://mermaidjs.github.io/mermaid-live-editor/#/view/%s' % encoded_str @_build_level(BuildLevel.GRAPH) - def to_jpg(self, path: str = 'flow.jpg', **kwargs): + def to_jpg(self, path: str = 'flow.jpg', **kwargs) -> None: """ Rendering the current flow as a jpg image, this will call :py:meth:`to_mermaid` and it needs internet connection @@ -249,7 +249,9 @@ def to_jpg(self, path: str = 'flow.jpg', **kwargs): from urllib.request import Request, urlopen encoded_str = self.to_url().replace('https://mermaidjs.github.io/mermaid-live-editor/#/view/', '') - self.logger.info('saving jpg...') + self.logger.warning('jpg exporting relies on https://mermaid.ink/, but it is not very stable. ' + 'some syntax are not supported, please use with caution.') + self.logger.info('downloading as jpg...') req = Request('https://mermaid.ink/img/%s' % encoded_str, headers={'User-Agent': 'Mozilla/5.0'}) with open(path, 'wb') as fp: fp.write(urlopen(req).read()) From b94490f17baf78c871478b2f7f68bffc28bf9393 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 14 Oct 2019 11:42:55 +0800 Subject: [PATCH 2/6] feat(flow): allow add service to be str --- gnes/flow/__init__.py | 7 +++++-- gnes/service/base.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 6572feb6..4dd41bf9 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -320,7 +320,7 @@ def add_router(self, *args, **kwargs) -> 'Flow': """Add a router to the current flow, a shortcut of :py:meth:`add(Service.Router)`""" return self.add(Service.Router, *args, **kwargs) - def add(self, service: 'Service', + def add(self, service: Union['Service', str], name: str = None, service_in: Union[str, Tuple[str], List[str], 'Service'] = None, service_out: Union[str, Tuple[str], List[str], 'Service'] = None, @@ -329,7 +329,7 @@ def add(self, service: 'Service', """ Add a service to the current flow object and return the new modified flow object - :param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend + :param service: a 'Service' enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend :param name: the name indentifier of the service, useful in 'service_in' and 'service_out' :param service_in: the name of the service(s) that this service receives data from. One can also use 'Service.Frontend' to indicate the connection with the frontend. @@ -342,6 +342,9 @@ def add(self, service: 'Service', op_flow = copy.deepcopy(self) if copy_flow else self + if isinstance(service, str): + service = Service.from_string(service) + if service not in Flow._service2parser: raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service2parser)) diff --git a/gnes/service/base.py b/gnes/service/base.py index 7da06d83..c9dcec75 100644 --- a/gnes/service/base.py +++ b/gnes/service/base.py @@ -42,7 +42,7 @@ def from_string(cls, s): try: return cls[s] except KeyError: - raise ValueError() + raise ValueError('%s is not a valid enum for %s' % (s, cls)) class ReduceOp(BetterEnum): From 9ca757b406cd1ac0477b26e52168732baba46eb7 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 14 Oct 2019 14:43:24 +0800 Subject: [PATCH 3/6] feat(flow): add set remove and set_last --- gnes/flow/__init__.py | 161 +++++++++++++++++++++++++++++++++++++--- tests/test_gnes_flow.py | 36 +++++++++ 2 files changed, 187 insertions(+), 10 deletions(-) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 4dd41bf9..cf4f09a3 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -33,6 +33,10 @@ class FlowTopologyError(ValueError): """Exception when the topology is ambiguous""" +class FlowMissingNode(ValueError): + """Exception when the topology is ambiguous""" + + class FlowBuildLevelMismatch(ValueError): """Exception when required level is higher than the current build level""" @@ -113,7 +117,7 @@ def __init__(self, with_frontend: bool = True, **kwargs): self._service_edges = {} self._service_name_counter = {k: 0 for k in Flow._service2parser.keys()} self._service_contexts = [] - self._last_add_service = None + self._last_changed_service = [] self._common_kwargs = kwargs self._frontend = None self._client = None @@ -320,6 +324,140 @@ def add_router(self, *args, **kwargs) -> 'Flow': """Add a router to the current flow, a shortcut of :py:meth:`add(Service.Router)`""" return self.add(Service.Router, *args, **kwargs) + def set_last_service(self, name: str, copy_flow: bool = True) -> 'Flow': + """ + Set a service as the last service in the flow, useful when modifying the flow. + + :param name: the name of the existing service + :param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification + :return: a (new) flow object with modification + """ + op_flow = copy.deepcopy(self) if copy_flow else self + + if name not in op_flow._service_nodes: + raise FlowMissingNode('service_in: %s can not be found in this Flow' % name) + + if op_flow._last_changed_service and name == op_flow._last_changed_service[-1]: + pass + else: + op_flow._last_changed_service.append(name) + + # graph is now changed so we need to + # reset the build level to the lowest + op_flow._build_level = Flow.BuildLevel.EMPTY + + return op_flow + + def set(self, name: str, service_in: Union[str, Tuple[str], List[str], 'Service'] = None, + service_out: Union[str, Tuple[str], List[str], 'Service'] = None, + copy_flow: bool = True, + clear_old_attr: bool = False, + as_last_service: bool = False, + **kwargs) -> 'Flow': + """ + Set the attribute of an existing service (added by :py:meth:`add`) in the flow. + For the attributes or kwargs that aren't given, they will remain unchanged as before. + + :param name: the name of the existing service + :param service_in: the name of the service(s) that this service receives data from. + One can also use 'Service.Frontend' to indicate the connection with the frontend. + :param service_out: the name of the service(s) that this service sends data to. + One can also use 'Service.Frontend' to indicate the connection with the frontend. + :param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification + :param clear_old_attr: remove old attribute value before setting the new one + :param as_last_service: whether setting the changed service as the last service in the graph + :param kwargs: other keyword-value arguments that the service CLI supports + :return: a (new) flow object with modification + """ + op_flow = copy.deepcopy(self) if copy_flow else self + + if name not in op_flow._service_nodes: + raise FlowMissingNode('service_in: %s can not be found in this Flow' % name) + + node = op_flow._service_nodes[name] + service = node['service'] + + if service_in: + service_in = op_flow._parse_service_endpoints(op_flow, name, service_in, connect_to_last_service=True) + + if clear_old_attr: + node['incomes'] = service_in + # remove all edges point to this service + for n in op_flow._service_nodes.values(): + if name in n['outgoings']: + n['outgoings'].remove(name) + else: + node['incomes'] = node['incomes'].union(service_in) + + # add it the new edge back + for s in service_in: + op_flow._service_nodes[s]['outgoings'].add(name) + + if service_out: + service_out = op_flow._parse_service_endpoints(op_flow, name, service_out, connect_to_last_service=False) + node['outgoings'] = service_out + if clear_old_attr: + # remove all edges this service point to + for n in op_flow._service_nodes.values(): + if name in n['incomes']: + n['incomes'].remove(name) + else: + node['outgoings'] = node['outgoings'].union(service_out) + + for s in service_out: + op_flow._service_nodes[s]['incomes'].add(name) + + if kwargs: + if not clear_old_attr: + node['kwargs'].update(kwargs) + kwargs = node['kwargs'] + args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs) + node['args'] = args + node['parsed_args'] = p_args + + if as_last_service: + op_flow.set_last_service(name, False) + + # graph is now changed so we need to + # reset the build level to the lowest + op_flow._build_level = Flow.BuildLevel.EMPTY + + return op_flow + + def remove(self, name: str = None, copy_flow: bool = True) -> 'Flow': + """ + Remove a service from the flow. + + :param name: the name of the existing service + :param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification + :return: a (new) flow object with modification + """ + + op_flow = copy.deepcopy(self) if copy_flow else self + + if name not in op_flow._service_nodes: + raise FlowMissingNode('service_in: %s can not be found in this Flow' % name) + + op_flow._service_nodes.pop(name) + + # remove all edges point to this service + for n in op_flow._service_nodes.values(): + if name in n['outgoings']: + n['outgoings'].remove(name) + if name in n['incomes']: + n['incomes'].remove(name) + + if op_flow._service_nodes: + op_flow._last_changed_service = [v for v in op_flow._last_changed_service if v != name] + else: + op_flow._last_changed_service = [] + + # graph is now changed so we need to + # reset the build level to the lowest + op_flow._build_level = Flow.BuildLevel.EMPTY + + return op_flow + def add(self, service: Union['Service', str], name: str = None, service_in: Union[str, Tuple[str], List[str], 'Service'] = None, @@ -327,10 +465,12 @@ def add(self, service: Union['Service', str], copy_flow: bool = True, **kwargs) -> 'Flow': """ - Add a service to the current flow object and return the new modified flow object + Add a service to the current flow object and return the new modified flow object. + The attribute of the service can be later changed with :py:meth:`set` or deleted with :py:meth:`remove` :param service: a 'Service' enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend - :param name: the name indentifier of the service, useful in 'service_in' and 'service_out' + :param name: the name identifier of the service, can be used in 'service_in', + 'service_out', :py:meth:`set` and :py:meth:`remove`. :param service_in: the name of the service(s) that this service receives data from. One can also use 'Service.Frontend' to indicate the connection with the frontend. :param service_out: the name of the service(s) that this service sends data to. @@ -371,7 +511,8 @@ def add(self, service: Union['Service', str], 'parsed_args': p_args, 'args': args, 'incomes': service_in, - 'outgoings': service_out} + 'outgoings': service_out, + 'kwargs': kwargs} # direct all income services' output to the current service for s in service_in: @@ -379,7 +520,7 @@ def add(self, service: Union['Service', str], for s in service_out: op_flow._service_nodes[s]['incomes'].add(name) - op_flow._last_add_service = name + op_flow.set_last_service(name, False) # graph is now changed so we need to # reset the build level to the lowest @@ -395,8 +536,8 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec elif service_endpoint == Service.Frontend: service_endpoint = [op_flow._frontend] elif not service_endpoint: - if op_flow._last_add_service and connect_to_last_service: - service_endpoint = [op_flow._last_add_service] + if op_flow._last_changed_service and connect_to_last_service: + service_endpoint = [op_flow._last_changed_service[-1]] else: service_endpoint = [] if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple): @@ -404,7 +545,7 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec if s == cur_service_name: raise FlowTopologyError('the income of a service can not be itself') if s not in op_flow._service_nodes: - raise FlowTopologyError('service_in: %s can not be found in this Flow' % s) + raise FlowMissingNode('service_in: %s can not be found in this Flow' % s) else: raise ValueError('service_in=%s is not parsable' % service_endpoint) return set(service_endpoint) @@ -444,11 +585,11 @@ def _build_graph(self, copy_flow: bool) -> 'Flow': if not op_flow._frontend: raise FlowImcompleteError('frontend does not exist, you may need to add_frontend()') - if not op_flow._last_add_service or not op_flow._service_nodes: + if not op_flow._last_changed_service or not op_flow._service_nodes: raise FlowTopologyError('flow is empty?') # close the loop - op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_add_service) + op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_changed_service[-1]) # build all edges for k, v in op_flow._service_nodes.items(): diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index c7d962a7..3f8f63ca 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -176,3 +176,39 @@ def test_query_flow_plot(self): .add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml')) .add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'))) print(flow.build(backend=None).to_url()) + + def test_flow_add_set(self): + f = (Flow(check_version=False, route_table=True) + .add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4) + .add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3) + .add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2) + .add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', service_in='prep', replicas=2) + .add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter', + num_part=2, service_in=['vec_idx', 'doc_idx']) + .build(backend=None)) + + print(f.to_url()) + print(f.set('prep', replicas=1).build(backend=None).to_url()) + # make it as query flow + + f1 = (f + .remove('sync_barrier') + .remove('doc_idx') + .set_last_service('vec_idx') + .add_router('scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml')) + .add_indexer('doc_idx', yaml_path='DictIndexer', replicas=2) + .build(backend=None)) + + print(f1.to_url()) + + # another way to convert f to an index flow + + f2 = (f + .set_last_service('vec_idx') + .add_router('scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml')) + .set('doc_idx', service_in='scorer', yaml_path='DictIndexer', replicas=2, clear_old_attr=True) + .remove('sync_barrier') + .set_last_service('doc_idx') + .build(backend=None)) + + print(f2.to_url()) From f6536c87642430526686e1a0f7bf308997dac3a0 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 14 Oct 2019 14:59:09 +0800 Subject: [PATCH 4/6] feat(flow): add eq operator to the flow to enable comparison --- gnes/flow/__init__.py | 23 ++++++++++++++++++++++- tests/test_gnes_flow.py | 4 ++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index cf4f09a3..b7372f4c 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -589,7 +589,7 @@ def _build_graph(self, copy_flow: bool) -> 'Flow': raise FlowTopologyError('flow is empty?') # close the loop - op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_changed_service[-1]) + op_flow._service_nodes[op_flow._frontend]['incomes'] = {op_flow._last_changed_service[-1]} # build all edges for k, v in op_flow._service_nodes.items(): @@ -726,3 +726,24 @@ def __getstate__(self): def __setstate__(self, d): self.__dict__.update(d) self.logger = set_logger(self.__class__.__name__) + + def __eq__(self, other): + """ + Comparing the topology of a flow with another flow. + Identification is defined by whether two flows share the same set of edges. + + :param other: the second flow object + :return: + """ + + if self._build_level.value < Flow.BuildLevel.GRAPH.value: + a = self.build(backend=None, copy_flow=True) + else: + a = self + + if other._build_level.value < Flow.BuildLevel.GRAPH.value: + b = other.build(backend=None, copy_flow=True) + else: + b = other + + return a._service_edges == b._service_edges diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 3f8f63ca..39aa49cd 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -212,3 +212,7 @@ def test_flow_add_set(self): .build(backend=None)) print(f2.to_url()) + + self.assertEqual(f1, f2) + + self.assertNotEqual(f1, f2.add_router('dummy', yaml_path='BaseRouter')) From 228a2b196e82bb270e659b869a37210c14c09358 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 14 Oct 2019 15:36:29 +0800 Subject: [PATCH 5/6] fix(flow): fix unit test assert in flow --- gnes/flow/__init__.py | 17 +++++------------ tests/test_gnes_flow.py | 4 ++-- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index b7372f4c..8470bb0e 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -90,7 +90,7 @@ class Flow: You can change this behavior by giving an argument `copy_flow=False`. """ - _supported_orch = {'swarm', 'k8s'} + _service2parser = { Service.Encoder: set_encoder_parser, Service.Router: set_router_parser, @@ -129,19 +129,12 @@ def __init__(self, with_frontend: bool = True, **kwargs): self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself') @_build_level(BuildLevel.GRAPH) - def to_yaml(self, orchestration: str) -> str: - if orchestration not in Flow._supported_orch: - raise TypeError( - '%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch)) - - @staticmethod - def from_yaml(orchestration: str) -> 'Flow': - if orchestration not in Flow._supported_orch: - raise TypeError( - '%s is not valid type of orchestration, should be one of %s' % (orchestration, Flow._supported_orch)) + def to_swarm_yaml(self) -> str: + swarm_yml = '' + return swarm_yml @_build_level(BuildLevel.GRAPH) - def to_mermaid(self, left_right: bool = True): + def to_mermaid(self, left_right: bool = True) -> str: """ Output the mermaid graph for visualization diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 39aa49cd..8bdad805 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -2,7 +2,7 @@ import unittest from gnes.cli.parser import set_client_cli_parser -from gnes.flow import Flow, Service as gfs, FlowBuildLevelMismatch, FlowTopologyError +from gnes.flow import Flow, Service as gfs, FlowBuildLevelMismatch class TestGNESFlow(unittest.TestCase): @@ -49,7 +49,7 @@ def test_flow1(self): print('f: %r g: %r' % (f, g)) f.build() print(f.to_mermaid()) - self.assertRaises(FlowTopologyError, g.build) + g.build() def test_flow1_ctx_empty(self): f = (Flow(check_version=False, route_table=True) From 80cb530e6527d70e2d175ce85138dc931c93e375 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 14 Oct 2019 16:34:03 +0800 Subject: [PATCH 6/6] feat(flow): add flow to python generator --- gnes/flow/__init__.py | 64 +++++++++++++++++++++++++++++++++++++++++ tests/test_gnes_flow.py | 3 ++ 2 files changed, 67 insertions(+) diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 8470bb0e..1f5d793a 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -112,6 +112,12 @@ class BuildLevel(BetterEnum): RUNTIME = 2 def __init__(self, with_frontend: bool = True, **kwargs): + """ + Create a new Flow object. + + :param with_frontend: adding frontend service to the flow + :param kwargs: keyword-value arguments that will be shared by all services + """ self.logger = set_logger(self.__class__.__name__) self._service_nodes = OrderedDict() self._service_edges = {} @@ -123,8 +129,10 @@ def __init__(self, with_frontend: bool = True, **kwargs): self._client = None self._build_level = Flow.BuildLevel.EMPTY self._backend = None + self._init_with_frontend = False if with_frontend: self.add_frontend(copy_flow=False) + self._init_with_frontend = True else: self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself') @@ -133,6 +141,61 @@ def to_swarm_yaml(self) -> str: swarm_yml = '' return swarm_yml + def to_python_code(self, indent: int = 4) -> str: + """ + Generate the python code of this flow + + :return: the generated python code + """ + py_code = ['from gnes.flow import Flow', ''] + kwargs = [] + if not self._init_with_frontend: + kwargs.append('with_frontend=False') + if self._common_kwargs: + kwargs.extend('%s=%s' % (k, v) for k, v in self._common_kwargs.items()) + py_code.append('f = (Flow(%s)' % (', '.join(kwargs))) + + known_service = set() + last_add_name = '' + + for k, v in self._service_nodes.items(): + kwargs = OrderedDict() + kwargs['service'] = str(v['service']) + kwargs['name'] = k + kwargs['service_in'] = '[%s]' % ( + ','.join({'\'%s\'' % k for k in v['incomes'] if k in known_service})) + if kwargs['service_in'] == '[\'%s\']' % last_add_name: + kwargs.pop('service_in') + kwargs['service_out'] = '[%s]' % (','.join({'\'%s\'' % k for k in v['outgoings'] if k in known_service})) + + known_service.add(k) + last_add_name = k + + py_code.append('%s.add(%s)' % ( + ' ' * indent, + ', '.join( + '%s=%s' % (kk, '\'%s\'' % vv if isinstance(vv, str) + and not vv.startswith('\'') and not vv.startswith('[') + else vv) for kk, vv + in + (list(kwargs.items()) + list(v['kwargs'].items())) if + vv and vv != '[]' and kk not in self._common_kwargs))) + + py_code[-1] += ')' + + py_code.extend(['', + '# build the flow and visualize it', + 'f.build(backend=None).to_url()' + ]) + py_code.extend(['', + '# use this flow in multi-thread mode for indexing', + 'with f.build(backend=\'thread\') as fl:', + '%sfl.index(txt_file=\'test.txt\')' % (' ' * indent) + ]) + py_code.append('') + + return '\n'.join(py_code) + @_build_level(BuildLevel.GRAPH) def to_mermaid(self, left_right: bool = True) -> str: """ @@ -407,6 +470,7 @@ def set(self, name: str, service_in: Union[str, Tuple[str], List[str], 'Service' args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs) node['args'] = args node['parsed_args'] = p_args + node['kwargs'] = kwargs if as_last_service: op_flow.set_last_service(name, False) diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 8bdad805..d98ec37f 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -216,3 +216,6 @@ def test_flow_add_set(self): self.assertEqual(f1, f2) self.assertNotEqual(f1, f2.add_router('dummy', yaml_path='BaseRouter')) + + print(f1.to_python_code()) + print(f.to_python_code())