diff --git a/gnes/__init__.py b/gnes/__init__.py index 67de3192..42c9f196 100644 --- a/gnes/__init__.py +++ b/gnes/__init__.py @@ -17,4 +17,4 @@ # do not change this line # this is managed by git tag and replaced on every release -__version__ = '0.0.25' +__version__ = '0.0.26' diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index 8e44bdcc..75ea08c6 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -19,6 +19,26 @@ import argparse +def resolve_yaml_path(path): + # priority, filepath > classname > default + import os + import io + if hasattr(path, 'read'): + # already a readable stream + return path + elif os.path.exists(path): + return open(path, encoding='utf8') + elif path.isidentifier(): + # possible class name + return io.StringIO('!%s {}' % path) + elif path.startswith('!'): + # possible YAML content + return io.StringIO(path) + else: + raise argparse.ArgumentTypeError('%s can not be resolved, it should be a readable stream,' + ' or a valid file path, or a supported class name.' % path) + + def set_base_parser(): from .. import __version__ from termcolor import colored @@ -49,7 +69,7 @@ def set_composer_parser(parser=None): type=str, default='GNES app', help='name of the instance') - parser.add_argument('--yaml_path', type=argparse.FileType('r'), + parser.add_argument('--yaml_path', type=resolve_yaml_path, default=resource_stream( 'gnes', '/'.join(('resources', 'config', 'compose', 'default.yml'))), help='yaml config of the service') @@ -134,14 +154,12 @@ def _set_client_parser(parser=None): def set_loadable_service_parser(parser=None): if not parser: parser = set_base_parser() - from pkg_resources import resource_stream from ..service.base import SocketType set_service_parser(parser) - parser.add_argument('--yaml_path', type=argparse.FileType('r'), - default=resource_stream( - 'gnes', '/'.join(('resources', 'config', 'encoder', 'default.yml'))), - help='yaml config of the service') + parser.add_argument('--yaml_path', type=resolve_yaml_path, required=True, + help='yaml config of the service, it should be a readable stream,' + ' or a valid file path, or a supported class name.') parser.set_defaults(socket_in=SocketType.PULL_BIND, socket_out=SocketType.PUSH_BIND) @@ -151,39 +169,26 @@ def set_loadable_service_parser(parser=None): def set_preprocessor_service_parser(parser=None): if not parser: parser = set_base_parser() - from pkg_resources import resource_stream set_loadable_service_parser(parser) - parser.get_default('yaml_path').close() - parser.set_defaults(yaml_path=resource_stream( - 'gnes', '/'.join(('resources', 'config', 'preprocessor', 'default.yml')))) parser.set_defaults(read_only=True) return parser def set_router_service_parser(parser=None): - from pkg_resources import resource_stream if not parser: parser = set_base_parser() set_loadable_service_parser(parser) - parser.get_default('yaml_path').close() - parser.set_defaults(yaml_path=resource_stream( - 'gnes', '/'.join(('resources', 'config', 'router', 'default.yml')))) - parser.set_defaults(read_only=True) return parser def set_indexer_service_parser(parser=None): from ..service.base import SocketType - from pkg_resources import resource_stream if not parser: parser = set_base_parser() set_loadable_service_parser(parser) - parser.get_default('yaml_path').close() - parser.set_defaults(yaml_path=resource_stream( - 'gnes', '/'.join(('resources', 'config', 'indexer', 'default.yml')))) # encoder's port_out is indexer's port_in parser.set_defaults(port_in=parser.get_default('port_out'), diff --git a/gnes/composer/base.py b/gnes/composer/base.py index e6fe7137..c92a6faf 100644 --- a/gnes/composer/base.py +++ b/gnes/composer/base.py @@ -43,11 +43,11 @@ class YamlComposer: } comp2args = { - 'Encoder': set_loadable_service_parser().parse_args([]), - 'Router': set_router_service_parser().parse_args([]), - 'Indexer': set_indexer_service_parser().parse_args([]), + 'Encoder': set_loadable_service_parser().parse_args(['--yaml_path', 'BaseEncoder']), + 'Router': set_router_service_parser().parse_args(['--yaml_path', 'BaseRouter']), + 'Indexer': set_indexer_service_parser().parse_args(['--yaml_path', 'BaseIndexer']), 'gRPCFrontend': set_grpc_frontend_parser().parse_args([]), - 'Preprocessor': set_preprocessor_service_parser().parse_args([]) + 'Preprocessor': set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor']) } class Layer: @@ -188,20 +188,23 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str = args = ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if a in YamlComposer.comp2args[c['name']] and a != 'yaml_path' and v] if 'yaml_path' in c and c['yaml_path'] is not None: - args.append('--yaml_path /%s_yaml' % c_name) - config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']} - - # if l_idx + 1 < len(all_layers): - # next_layer = all_layers[l_idx + 1] - # _l_idx = l_idx + 1 - # else: - # next_layer = all_layers[0] - # _l_idx = 0 - # host_out_name = '' - # for _c_idx, _c in enumerate(next_layer.components): - # if _c['port_in'] == c['port_out']: - # host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx) - # break + if c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml'): + args.append('--yaml_path /%s_yaml' % c_name) + config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']} + else: + args.append('--yaml_path %s' % c['yaml_path']) + + if l_idx + 1 < len(all_layers): + next_layer = all_layers[l_idx + 1] + _l_idx = l_idx + 1 + else: + next_layer = all_layers[0] + _l_idx = 0 + host_out_name = '' + for _c_idx, _c in enumerate(next_layer.components): + if _c['port_in'] == c['port_out']: + host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx) + break if l_idx - 1 >= 0: last_layer = all_layers[l_idx - 1] @@ -216,8 +219,10 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str = host_in_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx) break - args += ['--host_in %s' % host_in_name] - # '--host_out %s' % host_out_name] + if 'BIND' not in c['socket_out']: + args.append('--host_out %s' % host_out_name) + if 'BIND' not in c['socket_in']: + args.append('--host_in %s' % host_in_name) cmd = '%s %s' % (YamlComposer.comp2file[c['name']], ' '.join(args)) swarm_lines['services'][c_name] = CommentedMap({ @@ -235,7 +240,8 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str = } }) - if 'yaml_path' in c and c['yaml_path'] is not None: + if 'yaml_path' in c and c['yaml_path'] is not None \ + and (c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml')): swarm_lines['services'][c_name]['configs'] = ['%s_yaml' % c_name] if c['name'] == 'gRPCFrontend': @@ -382,7 +388,7 @@ def rule3(): self._num_layer += 1 last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) r = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': 'BaseRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': last_layer.components[0]['port_out'], @@ -403,6 +409,8 @@ def rule5(): # a shortcut fn: based on c3(): (N)-2-(N) with pub sub connection rule3() router_layers[0].components[0]['socket_out'] = str(SocketType.PUB_BIND) + router_layers[0].components[0]['yaml_path'] = '"!PublishRouter {parameter: {num_part: %d}}"' \ + % len(layer.components) for c in layer.components: c['socket_in'] = str(SocketType.SUB_CONNECT) @@ -413,7 +421,7 @@ def rule6(): for c in layer.components: income = self.Layer.get_value(c, 'income') r = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.SUB_CONNECT), 'socket_out': str(SocketType.PUSH_BIND) if income == 'pull' else str( SocketType.PUB_BIND), @@ -430,7 +438,7 @@ def rule7(): router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components), 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUB_BIND), 'port_in': self._get_random_port(), @@ -443,7 +451,7 @@ def rule7(): self._num_layer += 1 for c in layer.components: r = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': 'BaseRouter', 'socket_in': str(SocketType.SUB_CONNECT), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': r0['port_out'], @@ -459,7 +467,7 @@ def rule10(): router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components), 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUB_BIND), 'port_in': self._get_random_port(), @@ -476,7 +484,7 @@ def rule8(): router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': self._get_random_port(), @@ -487,7 +495,7 @@ def rule8(): if last_income == 'sub': c['socket_out'] = str(SocketType.PUSH_CONNECT) r_c = CommentedMap({'name': 'Router', - 'yaml_path': None, + 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_CONNECT), 'port_in': self._get_random_port(), @@ -517,26 +525,6 @@ def rule9(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) layer.components[0]['socket_in'] = str(SocketType.PULL_BIND) - def rule11(): - # a shortcut fn: (N)-2-(N) with push pull connection - router_layer = YamlComposer.Layer(layer_id=self._num_layer) - self._num_layer += 1 - r = CommentedMap({'name': 'Router', - 'yaml_path': None, - 'socket_in': str(SocketType.PULL_BIND), - 'socket_out': str(SocketType.PUSH_BIND), - 'port_in': self._get_random_port(), - 'port_out': self._get_random_port()}) - - for c in last_layer.components: - c['socket_out'] = str(SocketType.PUSH_CONNECT) - c['port_out'] = r['port_in'] - for c in layer.components: - c['socket_in'] = str(SocketType.PULL_CONNECT) - c['port_in'] = r['port_out'] - router_layer.append(r) - router_layers.append(router_layer) - router_layers = [] # type: List['self.Layer'] # bind the last out to current in diff --git a/gnes/encoder/image/base.py b/gnes/encoder/image/base.py index 2409f607..5d1d05a1 100644 --- a/gnes/encoder/image/base.py +++ b/gnes/encoder/image/base.py @@ -118,9 +118,9 @@ def _encode(_, img: List['np.ndarray']): # for video if len(img[0].shape) == 4: padding_image, max_lenth = _padding(img) - output = _encode(None, padding_image) + output = _encode(self, padding_image) # for image else: - output = _encode(None, img) + output = _encode(self, img) return output diff --git a/gnes/encoder/image/inception.py b/gnes/encoder/image/inception.py index 9919fba7..222c13fe 100644 --- a/gnes/encoder/image/inception.py +++ b/gnes/encoder/image/inception.py @@ -74,4 +74,4 @@ def _encode(_, data): feed_dict={self.inputs: data}) return end_points_[self.select_layer] - return _encode(None, img).astype(np.float32) + return _encode(self, img).astype(np.float32) diff --git a/gnes/encoder/video/incep_mixture.py b/gnes/encoder/video/incep_mixture.py index b3991ca1..c7b13057 100644 --- a/gnes/encoder/video/incep_mixture.py +++ b/gnes/encoder/video/incep_mixture.py @@ -118,7 +118,7 @@ def _encode1(_, data): feed_dict={self.inputs: data}) return end_points_[self.select_layer] - v = [_ for vi in _encode1(None, img) for _ in vi] + v = [_ for vi in _encode1(self, img) for _ in vi] v_input = [v[s:e] for s, e in zip(pos_start, pos_end)] v_input = [(vi + [[0.0] * self.input_size] * (max_len - len(vi)))[:max_len] for vi in v_input] @@ -129,4 +129,4 @@ def _encode2(_, data): return self.sess2.run(self.mix_model.repre, feed_dict={self.mix_model.feeds: data}) - return _encode2(None, v_input).astype(np.float32) + return _encode2(self, v_input).astype(np.float32) diff --git a/tests/test_batching.py b/tests/test_batching.py index b5f05065..a7983d0a 100644 --- a/tests/test_batching.py +++ b/tests/test_batching.py @@ -98,4 +98,14 @@ def test_mini_batch(self): def _do_mini_batch(_, y): return y + # this will follow self.batch_size, which is None + @batching + def _do_mini_batch2(_, y): + return y + self.assertEqual(_do_mini_batch(None, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]) + self.assertEqual(_do_mini_batch2(self, x), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + + self.batch_size = 4 + + self.assertEqual(_do_mini_batch2(self, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]) diff --git a/tests/test_preprocessor.py b/tests/test_preprocessor.py index 835f76f3..a63359fd 100644 --- a/tests/test_preprocessor.py +++ b/tests/test_preprocessor.py @@ -14,14 +14,15 @@ def setUp(self): self.single_cn = '矫矫珍木巅,得无金丸惧。' self.single_en = 'When forty winters shall besiege thy brow. And dig deep trenches in thy beautys field.' self.dirname = os.path.dirname(__file__) + self.yaml_path = os.path.join(self.dirname, 'yaml', 'test-preprocessor.yml') def test_preprocessor_service_empty(self): - args = set_preprocessor_service_parser().parse_args([]) + args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor']) with PreprocessorService(args): pass def test_preprocessor_service_echo(self): - args = set_preprocessor_service_parser().parse_args([]) + args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor']) c_args = _set_client_parser().parse_args([ '--port_in', str(args.port_out), '--port_out', str(args.port_in) @@ -38,7 +39,7 @@ def test_preprocessor_service_echo(self): print(r) def test_preprocessor_service_realdata(self): - args = set_preprocessor_service_parser().parse_args([]) + args = set_preprocessor_service_parser().parse_args(['--yaml_path', self.yaml_path]) c_args = _set_client_parser().parse_args([ '--port_in', str(args.port_out), '--port_out', str(args.port_in) diff --git a/tests/test_router.py b/tests/test_router.py index b9430569..8fe587ec 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -14,15 +14,15 @@ class TestProto(unittest.TestCase): def setUp(self): self.dirname = os.path.dirname(__file__) - self.publish_router_yaml = os.path.join(self.dirname, 'yaml', 'router-publish.yml') - self.batch_router_yaml = os.path.join(self.dirname, 'yaml', 'router-batch.yml') - self.reduce_router_yaml = os.path.join(self.dirname, 'yaml', 'router-reduce.yml') - self.chunk_router_yaml = os.path.join(self.dirname, 'yaml', 'router-chunk-reduce.yml') - self.doc_router_yaml = os.path.join(self.dirname, 'yaml', 'router-doc-reduce.yml') - self.concat_router_yaml = os.path.join(self.dirname, 'yaml', 'router-concat.yml') + self.publish_router_yaml = '!PublishRouter {parameter: {num_part: 2}}' + self.batch_router_yaml = '!DocBatchRouter {gnes_config: {batch_size: 2}}' + self.reduce_router_yaml = 'BaseReduceRouter' + self.chunk_router_yaml = 'ChunkReduceRouter' + self.doc_router_yaml = 'DocReduceRouter' + self.concat_router_yaml = 'ConcatEmbedRouter' def test_service_empty(self): - args = set_router_service_parser().parse_args([]) + args = set_router_service_parser().parse_args(['--yaml_path', 'BaseRouter']) with RouterService(args): pass @@ -287,25 +287,29 @@ def test_multimap_multireduce(self): '--socket_in', str(SocketType.SUB_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), '--port_in', str(p21.port_out), - '--port_out', str(r41.port_in) + '--port_out', str(r41.port_in), + '--yaml_path', 'BaseRouter' ]) r312 = set_router_service_parser().parse_args([ '--socket_in', str(SocketType.SUB_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), '--port_in', str(p21.port_out), - '--port_out', str(r41.port_in) + '--port_out', str(r41.port_in), + '--yaml_path', 'BaseRouter' ]) r321 = set_router_service_parser().parse_args([ '--socket_in', str(SocketType.SUB_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), '--port_in', str(p22.port_out), - '--port_out', str(r42.port_in) + '--port_out', str(r42.port_in), + '--yaml_path', 'BaseRouter' ]) r322 = set_router_service_parser().parse_args([ '--socket_in', str(SocketType.SUB_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), '--port_in', str(p22.port_out), - '--port_out', str(r42.port_in) + '--port_out', str(r42.port_in), + '--yaml_path', 'BaseRouter' ]) c_args = _set_client_parser().parse_args([ diff --git a/tests/test_stream_grpc.py b/tests/test_stream_grpc.py index f500fd20..0ae92da3 100644 --- a/tests/test_stream_grpc.py +++ b/tests/test_stream_grpc.py @@ -49,6 +49,7 @@ def test_grpc_frontend(self): '--port_out', str(args.port_in), '--socket_in', str(SocketType.PULL_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), + '--yaml_path', 'BaseRouter' ]) with RouterService(p_args), GRPCFrontend(args), grpc.insecure_channel( @@ -72,6 +73,7 @@ def test_async_block(self): '--port_out', '8899', '--socket_in', str(SocketType.PULL_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), + '--yaml_path', 'BaseRouter' ]) p2_args = set_router_service_parser().parse_args([ @@ -79,6 +81,7 @@ def test_async_block(self): '--port_out', str(args.port_in), '--socket_in', str(SocketType.PULL_BIND), '--socket_out', str(SocketType.PUSH_CONNECT), + '--yaml_path', 'BaseRouter' ]) with GRPCFrontend(args), Router1(p1_args), Router2(p2_args), grpc.insecure_channel( diff --git a/tests/yaml/test-preprocessor.yml b/tests/yaml/test-preprocessor.yml new file mode 100644 index 00000000..fe7f0fca --- /dev/null +++ b/tests/yaml/test-preprocessor.yml @@ -0,0 +1,7 @@ +!TextPreprocessor +parameter: + start_doc_id: 0 + random_doc_id: True + deliminator: "[.。!?!?]+" +gnes_config: + is_trained: true \ No newline at end of file