Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #77 from gnes-ai/feat-yaml-path
Browse files Browse the repository at this point in the history
 fix(composer): fix composer router generation logic
  • Loading branch information
jemmyshin authored Aug 7, 2019
2 parents 4edb8ee + 24f9fd1 commit 5c76cd1
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 87 deletions.
2 changes: 1 addition & 1 deletion gnes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

# do not change this line
# this is managed by git tag and replaced on every release
__version__ = '0.0.25'
__version__ = '0.0.26'
43 changes: 24 additions & 19 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@
import argparse


def resolve_yaml_path(path):
# priority, filepath > classname > default
import os
import io
if hasattr(path, 'read'):
# already a readable stream
return path
elif os.path.exists(path):
return open(path, encoding='utf8')
elif path.isidentifier():
# possible class name
return io.StringIO('!%s {}' % path)
elif path.startswith('!'):
# possible YAML content
return io.StringIO(path)
else:
raise argparse.ArgumentTypeError('%s can not be resolved, it should be a readable stream,'
' or a valid file path, or a supported class name.' % path)


def set_base_parser():
from .. import __version__
from termcolor import colored
Expand Down Expand Up @@ -49,7 +69,7 @@ def set_composer_parser(parser=None):
type=str,
default='GNES app',
help='name of the instance')
parser.add_argument('--yaml_path', type=argparse.FileType('r'),
parser.add_argument('--yaml_path', type=resolve_yaml_path,
default=resource_stream(
'gnes', '/'.join(('resources', 'config', 'compose', 'default.yml'))),
help='yaml config of the service')
Expand Down Expand Up @@ -134,14 +154,12 @@ def _set_client_parser(parser=None):
def set_loadable_service_parser(parser=None):
if not parser:
parser = set_base_parser()
from pkg_resources import resource_stream
from ..service.base import SocketType
set_service_parser(parser)

parser.add_argument('--yaml_path', type=argparse.FileType('r'),
default=resource_stream(
'gnes', '/'.join(('resources', 'config', 'encoder', 'default.yml'))),
help='yaml config of the service')
parser.add_argument('--yaml_path', type=resolve_yaml_path, required=True,
help='yaml config of the service, it should be a readable stream,'
' or a valid file path, or a supported class name.')

parser.set_defaults(socket_in=SocketType.PULL_BIND,
socket_out=SocketType.PUSH_BIND)
Expand All @@ -151,39 +169,26 @@ def set_loadable_service_parser(parser=None):
def set_preprocessor_service_parser(parser=None):
if not parser:
parser = set_base_parser()
from pkg_resources import resource_stream
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'preprocessor', 'default.yml'))))

parser.set_defaults(read_only=True)
return parser


def set_router_service_parser(parser=None):
from pkg_resources import resource_stream
if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'router', 'default.yml'))))

parser.set_defaults(read_only=True)
return parser


def set_indexer_service_parser(parser=None):
from ..service.base import SocketType
from pkg_resources import resource_stream

if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'indexer', 'default.yml'))))

# encoder's port_out is indexer's port_in
parser.set_defaults(port_in=parser.get_default('port_out'),
Expand Down
84 changes: 36 additions & 48 deletions gnes/composer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class YamlComposer:
}

comp2args = {
'Encoder': set_loadable_service_parser().parse_args([]),
'Router': set_router_service_parser().parse_args([]),
'Indexer': set_indexer_service_parser().parse_args([]),
'Encoder': set_loadable_service_parser().parse_args(['--yaml_path', 'BaseEncoder']),
'Router': set_router_service_parser().parse_args(['--yaml_path', 'BaseRouter']),
'Indexer': set_indexer_service_parser().parse_args(['--yaml_path', 'BaseIndexer']),
'gRPCFrontend': set_grpc_frontend_parser().parse_args([]),
'Preprocessor': set_preprocessor_service_parser().parse_args([])
'Preprocessor': set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
}

class Layer:
Expand Down Expand Up @@ -188,20 +188,23 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
args = ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if
a in YamlComposer.comp2args[c['name']] and a != 'yaml_path' and v]
if 'yaml_path' in c and c['yaml_path'] is not None:
args.append('--yaml_path /%s_yaml' % c_name)
config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']}

# if l_idx + 1 < len(all_layers):
# next_layer = all_layers[l_idx + 1]
# _l_idx = l_idx + 1
# else:
# next_layer = all_layers[0]
# _l_idx = 0
# host_out_name = ''
# for _c_idx, _c in enumerate(next_layer.components):
# if _c['port_in'] == c['port_out']:
# host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
# break
if c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml'):
args.append('--yaml_path /%s_yaml' % c_name)
config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']}
else:
args.append('--yaml_path %s' % c['yaml_path'])

if l_idx + 1 < len(all_layers):
next_layer = all_layers[l_idx + 1]
_l_idx = l_idx + 1
else:
next_layer = all_layers[0]
_l_idx = 0
host_out_name = ''
for _c_idx, _c in enumerate(next_layer.components):
if _c['port_in'] == c['port_out']:
host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
break

if l_idx - 1 >= 0:
last_layer = all_layers[l_idx - 1]
Expand All @@ -216,8 +219,10 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
host_in_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
break

args += ['--host_in %s' % host_in_name]
# '--host_out %s' % host_out_name]
if 'BIND' not in c['socket_out']:
args.append('--host_out %s' % host_out_name)
if 'BIND' not in c['socket_in']:
args.append('--host_in %s' % host_in_name)

cmd = '%s %s' % (YamlComposer.comp2file[c['name']], ' '.join(args))
swarm_lines['services'][c_name] = CommentedMap({
Expand All @@ -235,7 +240,8 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
}
})

if 'yaml_path' in c and c['yaml_path'] is not None:
if 'yaml_path' in c and c['yaml_path'] is not None \
and (c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml')):
swarm_lines['services'][c_name]['configs'] = ['%s_yaml' % c_name]

if c['name'] == 'gRPCFrontend':
Expand Down Expand Up @@ -382,7 +388,7 @@ def rule3():
self._num_layer += 1
last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT)
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': last_layer.components[0]['port_out'],
Expand All @@ -403,6 +409,8 @@ def rule5():
# a shortcut fn: based on c3(): (N)-2-(N) with pub sub connection
rule3()
router_layers[0].components[0]['socket_out'] = str(SocketType.PUB_BIND)
router_layers[0].components[0]['yaml_path'] = '"!PublishRouter {parameter: {num_part: %d}}"' \
% len(layer.components)
for c in layer.components:
c['socket_in'] = str(SocketType.SUB_CONNECT)

Expand All @@ -413,7 +421,7 @@ def rule6():
for c in layer.components:
income = self.Layer.get_value(c, 'income')
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.SUB_CONNECT),
'socket_out': str(SocketType.PUSH_BIND) if income == 'pull' else str(
SocketType.PUB_BIND),
Expand All @@ -430,7 +438,7 @@ def rule7():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r0 = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components),
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUB_BIND),
'port_in': self._get_random_port(),
Expand All @@ -443,7 +451,7 @@ def rule7():
self._num_layer += 1
for c in layer.components:
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseRouter',
'socket_in': str(SocketType.SUB_CONNECT),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': r0['port_out'],
Expand All @@ -459,7 +467,7 @@ def rule10():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r0 = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components),
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUB_BIND),
'port_in': self._get_random_port(),
Expand All @@ -476,7 +484,7 @@ def rule8():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': self._get_random_port(),
Expand All @@ -487,7 +495,7 @@ def rule8():
if last_income == 'sub':
c['socket_out'] = str(SocketType.PUSH_CONNECT)
r_c = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_CONNECT),
'port_in': self._get_random_port(),
Expand Down Expand Up @@ -517,26 +525,6 @@ def rule9():
last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT)
layer.components[0]['socket_in'] = str(SocketType.PULL_BIND)

def rule11():
# a shortcut fn: (N)-2-(N) with push pull connection
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': self._get_random_port(),
'port_out': self._get_random_port()})

for c in last_layer.components:
c['socket_out'] = str(SocketType.PUSH_CONNECT)
c['port_out'] = r['port_in']
for c in layer.components:
c['socket_in'] = str(SocketType.PULL_CONNECT)
c['port_in'] = r['port_out']
router_layer.append(r)
router_layers.append(router_layer)

router_layers = [] # type: List['self.Layer']
# bind the last out to current in

Expand Down
4 changes: 2 additions & 2 deletions gnes/encoder/image/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ def _encode(_, img: List['np.ndarray']):
# for video
if len(img[0].shape) == 4:
padding_image, max_lenth = _padding(img)
output = _encode(None, padding_image)
output = _encode(self, padding_image)
# for image
else:
output = _encode(None, img)
output = _encode(self, img)

return output
2 changes: 1 addition & 1 deletion gnes/encoder/image/inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ def _encode(_, data):
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

return _encode(None, img).astype(np.float32)
return _encode(self, img).astype(np.float32)
4 changes: 2 additions & 2 deletions gnes/encoder/video/incep_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _encode1(_, data):
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

v = [_ for vi in _encode1(None, img) for _ in vi]
v = [_ for vi in _encode1(self, img) for _ in vi]

v_input = [v[s:e] for s, e in zip(pos_start, pos_end)]
v_input = [(vi + [[0.0] * self.input_size] * (max_len - len(vi)))[:max_len] for vi in v_input]
Expand All @@ -129,4 +129,4 @@ def _encode2(_, data):
return self.sess2.run(self.mix_model.repre,
feed_dict={self.mix_model.feeds: data})

return _encode2(None, v_input).astype(np.float32)
return _encode2(self, v_input).astype(np.float32)
10 changes: 10 additions & 0 deletions tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,14 @@ def test_mini_batch(self):
def _do_mini_batch(_, y):
return y

# this will follow self.batch_size, which is None
@batching
def _do_mini_batch2(_, y):
return y

self.assertEqual(_do_mini_batch(None, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]])
self.assertEqual(_do_mini_batch2(self, x), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

self.batch_size = 4

self.assertEqual(_do_mini_batch2(self, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]])
7 changes: 4 additions & 3 deletions tests/test_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ def setUp(self):
self.single_cn = '矫矫珍木巅,得无金丸惧。'
self.single_en = 'When forty winters shall besiege thy brow. And dig deep trenches in thy beautys field.'
self.dirname = os.path.dirname(__file__)
self.yaml_path = os.path.join(self.dirname, 'yaml', 'test-preprocessor.yml')

def test_preprocessor_service_empty(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
with PreprocessorService(args):
pass

def test_preprocessor_service_echo(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
'--port_out', str(args.port_in)
Expand All @@ -38,7 +39,7 @@ def test_preprocessor_service_echo(self):
print(r)

def test_preprocessor_service_realdata(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', self.yaml_path])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
'--port_out', str(args.port_in)
Expand Down
Loading

0 comments on commit 5c76cd1

Please sign in to comment.