From b34a765aa851ccabf45f73a740e82b95de3f1c1a Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Sat, 20 Jul 2019 12:40:24 +0800 Subject: [PATCH] feat(compose): add interactive mode of GNES board using Flask --- gnes/cli/api.py | 9 +++- gnes/cli/parser.py | 22 ++++++-- gnes/client/http.py | 2 +- gnes/composer/base.py | 62 ++++++++++++----------- gnes/composer/flask.py | 47 +++++++++++++++++ gnes/resources/compose/gnes-board.html | 30 ++++++++--- gnes/resources/config/compose/default.yml | 13 +++++ setup.py | 2 +- tests/test_compose.py | 16 ++++-- 9 files changed, 156 insertions(+), 47 deletions(-) create mode 100644 gnes/composer/flask.py create mode 100644 gnes/resources/config/compose/default.yml diff --git a/gnes/cli/api.py b/gnes/cli/api.py index d609a8a7..804adbe5 100644 --- a/gnes/cli/api.py +++ b/gnes/cli/api.py @@ -35,8 +35,13 @@ def route(args): def compose(args): - from ..composer.base import YamlGraph - YamlGraph(args).build_all() + from ..composer.base import YamlComposer + from ..composer.flask import YamlComposerFlask + + if args.flask: + YamlComposerFlask(args).run() + else: + YamlComposer(args).build_all() def frontend(args): diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index c90a5f56..8517b40d 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -34,6 +34,8 @@ def set_base_parser(): def set_composer_parser(parser=None): + from pkg_resources import resource_stream + if not parser: parser = set_base_parser() parser.add_argument('--port', @@ -45,8 +47,9 @@ def set_composer_parser(parser=None): default='GNES instance', help='name of the instance') parser.add_argument('--yaml_path', type=argparse.FileType('r'), - required=True, - help='yaml config of the service') + default=resource_stream( + 'gnes', '/'.join(('resources', 'config', 'compose', 'default.yml'))), + help='yaml config of the service') parser.add_argument('--html_path', type=argparse.FileType('w', encoding='utf8'), default='./gnes-board.html', help='output path of the HTML file, will contain all possible generations') @@ -69,6 +72,19 @@ def set_composer_parser(parser=None): return parser +def set_composer_flask_parser(parser=None): + if not parser: + parser = set_base_parser() + set_composer_parser(parser) + parser.add_argument('--flask', action='store_true', default=False, + help='using Flask to serve GNES composer in interactive mode') + parser.add_argument('--cors', type=str, default='*', + help='setting "Access-Control-Allow-Origin" for HTTP requests') + parser.add_argument('--http_port', type=int, default=8080, + help='server port for receiving HTTP requests') + return parser + + def set_service_parser(parser=None): from ..service.base import SocketType, BaseService if not parser: @@ -253,5 +269,5 @@ def get_main_parser(): set_preprocessor_service_parser(sp.add_parser('preprocess', help='start a preprocessor service')) set_http_service_parser(sp.add_parser('client_http', help='start a http service')) set_cli_client_parser(sp.add_parser('client_cli', help='start a grpc client')) - set_composer_parser(sp.add_parser('compose', help='start a GNES composer to simplify config generation')) + set_composer_flask_parser(sp.add_parser('compose', help='start a GNES composer to simplify config generation')) return parser diff --git a/gnes/client/http.py b/gnes/client/http.py index e85efaf4..fc1027cb 100644 --- a/gnes/client/http.py +++ b/gnes/client/http.py @@ -21,7 +21,6 @@ from concurrent.futures import ThreadPoolExecutor import grpc -from aiohttp import web from google.protobuf.json_format import MessageToJson from ..helper import set_logger @@ -34,6 +33,7 @@ def __init__(self, args=None): self.logger = set_logger(self.__class__.__name__, self.args.verbose) def start(self): + from aiohttp import web loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=self.args.max_workers) diff --git a/gnes/composer/base.py b/gnes/composer/base.py index 27e84983..294e6562 100644 --- a/gnes/composer/base.py +++ b/gnes/composer/base.py @@ -19,7 +19,7 @@ _yaml = YAML() -class YamlGraph: +class YamlComposer: comp2file = { 'Encoder': 'encode', 'Router': 'route', @@ -50,7 +50,7 @@ def __init__(self, layer_id: int = 0): @staticmethod def get_value(comp: Dict, key: str): - return comp.get(key, YamlGraph.Layer.default_values[key]) + return comp.get(key, YamlComposer.Layer.default_values[key]) @property def is_homogenous(self): @@ -83,7 +83,7 @@ def __repr__(self): def __init__(self, args): - self._layers = [] # type: List['YamlGraph.Layer'] + self._layers = [] # type: List['YamlComposer.Layer'] self.logger = set_logger(self.__class__.__name__) with args.yaml_path: tmp = _yaml.load(args.yaml_path) @@ -136,8 +136,8 @@ def add_layer(self, layer: 'Layer' = None) -> None: def add_comp(self, comp: Dict) -> None: self._layers[-1].append(comp) - def build_layers(self) -> List['YamlGraph.Layer']: - all_layers = [] # type: List['YamlGraph.Layer'] + def build_layers(self) -> List['YamlComposer.Layer']: + all_layers = [] # type: List['YamlComposer.Layer'] for idx, layer in enumerate(self._layers[1:] + [self._layers[0]], 1): last_layer = self._layers[idx - 1] for l in self._add_router(last_layer, layer): @@ -149,7 +149,7 @@ def build_layers(self) -> List['YamlGraph.Layer']: return all_layers @staticmethod - def build_dockerswarm(all_layers: List['YamlGraph.Layer'], docker_img: str = 'gnes/gnes:latest', + def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str = 'gnes/gnes:latest', volumes: Dict = None, networks: Dict = None) -> str: with resource_stream('gnes', '/'.join(('resources', 'compose', 'gnes-swarm.yml'))) as r: swarm_lines = _yaml.load(r) @@ -158,7 +158,7 @@ def build_dockerswarm(all_layers: List['YamlGraph.Layer'], docker_img: str = 'gn for c_idx, c in enumerate(layer.components): c_name = '%s%d%d' % (c['name'], l_idx, c_idx) args = ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if - a in YamlGraph.comp2args[c['name']] and v] + a in YamlComposer.comp2args[c['name']] and v] if 'yaml_path' in c and c['yaml_path'] is not None: args.append('--yaml_path /%s_yaml' % c_name) config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']} @@ -191,16 +191,16 @@ def build_dockerswarm(all_layers: List['YamlGraph.Layer'], docker_img: str = 'gn args += ['--host_in %s' % host_in_name] # '--host_out %s' % host_out_name] - cmd = '%s %s' % (YamlGraph.comp2file[c['name']], ' '.join(args)) + cmd = '%s %s' % (YamlComposer.comp2file[c['name']], ' '.join(args)) swarm_lines['services'][c_name] = CommentedMap({ 'image': docker_img, 'command': cmd, }) - rep_c = YamlGraph.Layer.get_value(c, 'replicas') + rep_c = YamlComposer.Layer.get_value(c, 'replicas') if rep_c > 1: swarm_lines['services'][c_name]['deploy'] = CommentedMap({ - 'replicas': YamlGraph.Layer.get_value(c, 'replicas'), + 'replicas': YamlComposer.Layer.get_value(c, 'replicas'), 'restart_policy': { 'condition': 'on-failure', 'max_attempts': 3, @@ -223,22 +223,22 @@ def build_dockerswarm(all_layers: List['YamlGraph.Layer'], docker_img: str = 'gn return stream.getvalue() @staticmethod - def build_kubernetes(all_layers: List['YamlGraph.Layer'], *args, **kwargs): + def build_kubernetes(all_layers: List['YamlComposer.Layer'], *args, **kwargs): pass @staticmethod - def build_shell(all_layers: List['YamlGraph.Layer'], log_redirect: str = None) -> str: + def build_shell(all_layers: List['YamlComposer.Layer'], log_redirect: str = None) -> str: shell_lines = [] for layer in all_layers: for c in layer.components: - rep_c = YamlGraph.Layer.get_value(c, 'replicas') + rep_c = YamlComposer.Layer.get_value(c, 'replicas') shell_lines.append('printf "starting service %s with %s replicas...\\n"' % ( colored(c['name'], 'green'), colored(rep_c, 'yellow'))) for _ in range(rep_c): - cmd = YamlGraph.comp2file[c['name']] + cmd = YamlComposer.comp2file[c['name']] args = ' '.join( ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if - a in YamlGraph.comp2args[c['name']] and v]) + a in YamlComposer.comp2args[c['name']] and v]) shell_lines.append('gnes %s %s %s &' % ( cmd, args, '>> %s 2>&1' % log_redirect if log_redirect else '')) @@ -246,7 +246,7 @@ def build_shell(all_layers: List['YamlGraph.Layer'], log_redirect: str = None) - return r.read().decode().replace('{{gnes-template}}', '\n'.join(shell_lines)) @staticmethod - def build_mermaid(all_layers: List['YamlGraph.Layer'], mermaid_leftright: bool = False) -> str: + def build_mermaid(all_layers: List['YamlComposer.Layer'], mermaid_leftright: bool = False) -> str: mermaid_graph = [] cls_dict = defaultdict(set) for l_idx, layer in enumerate(all_layers[1:] + [all_layers[0]], 1): @@ -255,20 +255,20 @@ def build_mermaid(all_layers: List['YamlGraph.Layer'], mermaid_leftright: bool = for c_idx, c in enumerate(last_layer.components): # if len(last_layer.components) > 1: # self.mermaid_graph.append('\tsubgraph %s%d' % (c['name'], c_idx)) - for j in range(YamlGraph.Layer.get_value(c, 'replicas')): + for j in range(YamlComposer.Layer.get_value(c, 'replicas')): for c1_idx, c1 in enumerate(layer.components): if c1['port_in'] == c['port_out']: p = '((%s%s))' if c['name'] == 'Router' else '(%s%s)' p1 = '((%s%s))' if c1['name'] == 'Router' else '(%s%s)' - for j1 in range(YamlGraph.Layer.get_value(c1, 'replicas')): + for j1 in range(YamlComposer.Layer.get_value(c1, 'replicas')): _id, _id1 = '%s%s%s' % (last_layer.layer_id, c_idx, j), '%s%s%s' % ( layer.layer_id, c1_idx, j1) conn_type = ( c['socket_out'].split('_')[0] + '/' + c1['socket_in'].split('_')[0]).lower() s_id = '%s%s' % (c_idx if len(last_layer.components) > 1 else '', - j if YamlGraph.Layer.get_value(c, 'replicas') > 1 else '') + j if YamlComposer.Layer.get_value(c, 'replicas') > 1 else '') s1_id = '%s%s' % (c1_idx if len(layer.components) > 1 else '', - j1 if YamlGraph.Layer.get_value(c1, 'replicas') > 1 else '') + j1 if YamlComposer.Layer.get_value(c1, 'replicas') > 1 else '') mermaid_graph.append( '\t%s%s%s-- %s -->%s%s%s' % ( c['name'], _id, p % (c['name'], s_id), conn_type, c1['name'], _id1, @@ -319,11 +319,15 @@ def std_or_print(f, content): 'timestamp': time.strftime("%a, %d %b %Y %H:%M:%S"), 'version': __version__ } + + cmds['html'] = self.build_html(cmds) + std_or_print(self.args.graph_path, cmds['mermaid']) std_or_print(self.args.shell_path, cmds['shell']) std_or_print(self.args.swarm_path, cmds['docker']) std_or_print(self.args.k8s_path, cmds['k8s']) - std_or_print(self.args.html_path, self.build_html(cmds)) + std_or_print(self.args.html_path, cmds['html']) + return cmds @staticmethod def _get_random_port(min_port: int = 49152, max_port: int = 65536) -> str: @@ -333,7 +337,7 @@ def _get_random_port(min_port: int = 49152, max_port: int = 65536) -> str: def _get_random_host(comp_name: str) -> str: return str(comp_name + str(random.randrange(0, 100))) - def _add_router(self, last_layer: 'YamlGraph.Layer', layer: 'YamlGraph.Layer') -> List['YamlGraph.Layer']: + def _add_router(self, last_layer: 'YamlComposer.Layer', layer: 'YamlComposer.Layer') -> List['YamlComposer.Layer']: def rule1(): # a shortcut fn: push connect the last and current last_layer.components[0]['socket_out'] = str(SocketType.PUSH_BIND) @@ -346,7 +350,7 @@ def rule2(): def rule3(): # a shortcut fn: (N)-2-(N) with push pull connection - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) r = CommentedMap({'name': 'Router', @@ -375,7 +379,7 @@ def rule5(): def rule6(): last_layer.components[0]['socket_out'] = str(SocketType.PUB_BIND) - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 for c in layer.components: income = self.Layer.get_value(c, 'income') @@ -394,7 +398,7 @@ def rule6(): def rule7(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', 'yaml_path': None, @@ -406,7 +410,7 @@ def rule7(): router_layers.append(router_layer) last_layer.components[0]['port_out'] = r0['port_in'] - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 for c in layer.components: r = CommentedMap({'name': 'Router', @@ -423,7 +427,7 @@ def rule7(): def rule10(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', 'yaml_path': None, @@ -441,7 +445,7 @@ def rule10(): def rule8(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r = CommentedMap({'name': 'Router', 'yaml_path': None, @@ -475,7 +479,7 @@ def rule8(): else: self._num_layer -= 1 - router_layer = YamlGraph.Layer(layer_id=self._num_layer) + router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 router_layer.append(r) router_layers.append(router_layer) diff --git a/gnes/composer/flask.py b/gnes/composer/flask.py new file mode 100644 index 00000000..9d94704f --- /dev/null +++ b/gnes/composer/flask.py @@ -0,0 +1,47 @@ +import tempfile + +from .base import YamlComposer +from ..cli.parser import set_composer_parser + + +class YamlComposerFlask: + def __init__(self, args): + self.args = args + + def create_flask_app(self): + try: + from flask import Flask, request, abort, redirect, url_for + from flask_compress import Compress + from flask_cors import CORS + except ImportError: + raise ImportError('Flask or its dependencies are not fully installed, ' + 'they are required for serving HTTP requests.' + 'Please use "pip install Flask" to install it.') + + # support up to 10 concurrent HTTP requests + app = Flask(__name__) + + @app.route('/', methods=['GET']) + def get_homepage(): + return YamlComposer(set_composer_parser().parse_args([])).build_all()['html'] + + @app.route('/refresh', methods=['POST']) + def regenerate(): + data = request.form if request.form else request.json + f = tempfile.NamedTemporaryFile('w', delete=False).name + with open(f, 'w', encoding='utf8') as fp: + fp.write(data['yaml-config']) + try: + return YamlComposer(set_composer_parser().parse_args([ + '--yaml_path', f + ])).build_all()['html'] + except Exception: + return 'Bad YAML input, please kindly check the format, indent and content of your YAML file!' + + CORS(app, origins=self.args.cors) + Compress().init_app(app) + return app + + def run(self): + app = self.create_flask_app() + app.run(port=self.args.http_port, threaded=True, host='0.0.0.0') diff --git a/gnes/resources/compose/gnes-board.html b/gnes/resources/compose/gnes-board.html index 81fc94e0..22fe11c3 100644 --- a/gnes/resources/compose/gnes-board.html +++ b/gnes/resources/compose/gnes-board.html @@ -200,14 +200,28 @@ YAML config
- -
-                    
+                    
+
+
+ + +
+
+
+ + +
+ +
+ +
+
@@ -218,7 +232,7 @@

This is the workflow generated from your input YAML config, which helps you - to understand how microservices work together in GNES.

+ to understand how microservices work together in GNES.

diff --git a/gnes/resources/config/compose/default.yml b/gnes/resources/config/compose/default.yml new file mode 100644 index 00000000..25dcfae7 --- /dev/null +++ b/gnes/resources/config/compose/default.yml @@ -0,0 +1,13 @@ +port: 5566 +services: +- name: Preprocessor + replicas: 2 +- name: Encoder + replicas: 3 +- + - name: Indexer + yaml_path: indexer-binary.yml + replicas: 4 + - name: Indexer + yaml_path: indexer-fulltext.yml + replicas: 3 \ No newline at end of file diff --git a/setup.py b/setup.py index 6df1718a..b54bcb79 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,6 @@ 'protobuf', 'grpcio', 'ruamel.yaml>=0.15.89', - 'aiohttp==3.5.4', 'pyzmq>=17.1.0', ] bert_dep = ['bert-serving-server>=1.8.6', 'bert-serving-client>=1.8.6'] @@ -52,6 +51,7 @@ vision_dep = ['opencv-python>=4.0.0', 'torchvision==0.3.0', 'imagehash>=4.0'] leveldb_dep = ['plyvel>=1.0.5'] test_dep = ['pylint', 'memory_profiler>=0.55.0', 'psutil>=5.6.1', 'gputil>=1.4.0'] +http_dep = ['flask', 'flask-compress', 'flask-cors', 'flask-json', 'aiohttp==3.5.4'] all_dep = list(set(base_dep + cn_nlp_dep + vision_dep + leveldb_dep + test_dep + annoy_dep)) setup( diff --git a/tests/test_compose.py b/tests/test_compose.py index 42ea3244..d771e3a2 100644 --- a/tests/test_compose.py +++ b/tests/test_compose.py @@ -1,8 +1,9 @@ import os import unittest -from gnes.cli.parser import set_composer_parser -from gnes.composer.base import YamlGraph +from gnes.cli.parser import set_composer_parser, set_composer_flask_parser +from gnes.composer.base import YamlComposer +from gnes.composer.flask import YamlComposerFlask class TestCompose(unittest.TestCase): @@ -21,7 +22,7 @@ def _test_topology(self, yaml_path: str, num_layer_before: int, num_layer_after: '--yaml_path', yaml_path, '--html_path', self.html_path ]) - a = YamlGraph(args) + a = YamlComposer(args) self.assertEqual(len(a._layers), num_layer_before) r = a.build_layers() self.assertEqual(len(r), num_layer_after) @@ -32,6 +33,15 @@ def _test_topology(self, yaml_path: str, num_layer_before: int, num_layer_after: os.path.exists(self.html_path) print(a.build_dockerswarm(r)) + def test_flask(self): + yaml_path = os.path.join(self.dirname, 'yaml', 'topology1.yml') + args = set_composer_flask_parser().parse_args([ + '--flask', + '--yaml_path', yaml_path, + '--html_path', self.html_path + ]) + YamlComposerFlask(args).run() + def tearDown(self): if os.path.exists(self.html_path): os.remove(self.html_path)