Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

fix(composer): fix composer router generation logic #77

Merged
merged 9 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gnes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

# do not change this line
# this is managed by git tag and replaced on every release
__version__ = '0.0.25'
__version__ = '0.0.26'
43 changes: 24 additions & 19 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@
import argparse


def resolve_yaml_path(path):
# priority, filepath > classname > default
import os
import io
if hasattr(path, 'read'):
# already a readable stream
return path
elif os.path.exists(path):
return open(path, encoding='utf8')
elif path.isidentifier():
# possible class name
return io.StringIO('!%s {}' % path)
elif path.startswith('!'):
# possible YAML content
return io.StringIO(path)
else:
raise argparse.ArgumentTypeError('%s can not be resolved, it should be a readable stream,'
' or a valid file path, or a supported class name.' % path)


def set_base_parser():
from .. import __version__
from termcolor import colored
Expand Down Expand Up @@ -49,7 +69,7 @@ def set_composer_parser(parser=None):
type=str,
default='GNES app',
help='name of the instance')
parser.add_argument('--yaml_path', type=argparse.FileType('r'),
parser.add_argument('--yaml_path', type=resolve_yaml_path,
default=resource_stream(
'gnes', '/'.join(('resources', 'config', 'compose', 'default.yml'))),
help='yaml config of the service')
Expand Down Expand Up @@ -134,14 +154,12 @@ def _set_client_parser(parser=None):
def set_loadable_service_parser(parser=None):
if not parser:
parser = set_base_parser()
from pkg_resources import resource_stream
from ..service.base import SocketType
set_service_parser(parser)

parser.add_argument('--yaml_path', type=argparse.FileType('r'),
default=resource_stream(
'gnes', '/'.join(('resources', 'config', 'encoder', 'default.yml'))),
help='yaml config of the service')
parser.add_argument('--yaml_path', type=resolve_yaml_path, required=True,
help='yaml config of the service, it should be a readable stream,'
' or a valid file path, or a supported class name.')

parser.set_defaults(socket_in=SocketType.PULL_BIND,
socket_out=SocketType.PUSH_BIND)
Expand All @@ -151,39 +169,26 @@ def set_loadable_service_parser(parser=None):
def set_preprocessor_service_parser(parser=None):
if not parser:
parser = set_base_parser()
from pkg_resources import resource_stream
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'preprocessor', 'default.yml'))))

parser.set_defaults(read_only=True)
return parser


def set_router_service_parser(parser=None):
from pkg_resources import resource_stream
if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'router', 'default.yml'))))

parser.set_defaults(read_only=True)
return parser


def set_indexer_service_parser(parser=None):
from ..service.base import SocketType
from pkg_resources import resource_stream

if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)
parser.get_default('yaml_path').close()
parser.set_defaults(yaml_path=resource_stream(
'gnes', '/'.join(('resources', 'config', 'indexer', 'default.yml'))))

# encoder's port_out is indexer's port_in
parser.set_defaults(port_in=parser.get_default('port_out'),
Expand Down
84 changes: 36 additions & 48 deletions gnes/composer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class YamlComposer:
}

comp2args = {
'Encoder': set_loadable_service_parser().parse_args([]),
'Router': set_router_service_parser().parse_args([]),
'Indexer': set_indexer_service_parser().parse_args([]),
'Encoder': set_loadable_service_parser().parse_args(['--yaml_path', 'BaseEncoder']),
'Router': set_router_service_parser().parse_args(['--yaml_path', 'BaseRouter']),
'Indexer': set_indexer_service_parser().parse_args(['--yaml_path', 'BaseIndexer']),
'gRPCFrontend': set_grpc_frontend_parser().parse_args([]),
'Preprocessor': set_preprocessor_service_parser().parse_args([])
'Preprocessor': set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
}

class Layer:
Expand Down Expand Up @@ -188,20 +188,23 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
args = ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if
a in YamlComposer.comp2args[c['name']] and a != 'yaml_path' and v]
if 'yaml_path' in c and c['yaml_path'] is not None:
args.append('--yaml_path /%s_yaml' % c_name)
config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']}

# if l_idx + 1 < len(all_layers):
# next_layer = all_layers[l_idx + 1]
# _l_idx = l_idx + 1
# else:
# next_layer = all_layers[0]
# _l_idx = 0
# host_out_name = ''
# for _c_idx, _c in enumerate(next_layer.components):
# if _c['port_in'] == c['port_out']:
# host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
# break
if c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml'):
args.append('--yaml_path /%s_yaml' % c_name)
config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']}
else:
args.append('--yaml_path %s' % c['yaml_path'])

if l_idx + 1 < len(all_layers):
next_layer = all_layers[l_idx + 1]
_l_idx = l_idx + 1
else:
next_layer = all_layers[0]
_l_idx = 0
host_out_name = ''
for _c_idx, _c in enumerate(next_layer.components):
if _c['port_in'] == c['port_out']:
host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
break

if l_idx - 1 >= 0:
last_layer = all_layers[l_idx - 1]
Expand All @@ -216,8 +219,10 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
host_in_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx)
break

args += ['--host_in %s' % host_in_name]
# '--host_out %s' % host_out_name]
if 'BIND' not in c['socket_out']:
args.append('--host_out %s' % host_out_name)
if 'BIND' not in c['socket_in']:
args.append('--host_in %s' % host_in_name)

cmd = '%s %s' % (YamlComposer.comp2file[c['name']], ' '.join(args))
swarm_lines['services'][c_name] = CommentedMap({
Expand All @@ -235,7 +240,8 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
}
})

if 'yaml_path' in c and c['yaml_path'] is not None:
if 'yaml_path' in c and c['yaml_path'] is not None \
and (c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml')):
swarm_lines['services'][c_name]['configs'] = ['%s_yaml' % c_name]

if c['name'] == 'gRPCFrontend':
Expand Down Expand Up @@ -382,7 +388,7 @@ def rule3():
self._num_layer += 1
last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT)
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': last_layer.components[0]['port_out'],
Expand All @@ -403,6 +409,8 @@ def rule5():
# a shortcut fn: based on c3(): (N)-2-(N) with pub sub connection
rule3()
router_layers[0].components[0]['socket_out'] = str(SocketType.PUB_BIND)
router_layers[0].components[0]['yaml_path'] = '"!PublishRouter {parameter: {num_part: %d}}"' \
% len(layer.components)
for c in layer.components:
c['socket_in'] = str(SocketType.SUB_CONNECT)

Expand All @@ -413,7 +421,7 @@ def rule6():
for c in layer.components:
income = self.Layer.get_value(c, 'income')
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.SUB_CONNECT),
'socket_out': str(SocketType.PUSH_BIND) if income == 'pull' else str(
SocketType.PUB_BIND),
Expand All @@ -430,7 +438,7 @@ def rule7():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r0 = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components),
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUB_BIND),
'port_in': self._get_random_port(),
Expand All @@ -443,7 +451,7 @@ def rule7():
self._num_layer += 1
for c in layer.components:
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseRouter',
'socket_in': str(SocketType.SUB_CONNECT),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': r0['port_out'],
Expand All @@ -459,7 +467,7 @@ def rule10():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r0 = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': '"!PublishRouter {parameter: {num_part: %d}}"' % len(layer.components),
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUB_BIND),
'port_in': self._get_random_port(),
Expand All @@ -476,7 +484,7 @@ def rule8():
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': self._get_random_port(),
Expand All @@ -487,7 +495,7 @@ def rule8():
if last_income == 'sub':
c['socket_out'] = str(SocketType.PUSH_CONNECT)
r_c = CommentedMap({'name': 'Router',
'yaml_path': None,
'yaml_path': 'BaseReduceRouter',
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_CONNECT),
'port_in': self._get_random_port(),
Expand Down Expand Up @@ -517,26 +525,6 @@ def rule9():
last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT)
layer.components[0]['socket_in'] = str(SocketType.PULL_BIND)

def rule11():
# a shortcut fn: (N)-2-(N) with push pull connection
router_layer = YamlComposer.Layer(layer_id=self._num_layer)
self._num_layer += 1
r = CommentedMap({'name': 'Router',
'yaml_path': None,
'socket_in': str(SocketType.PULL_BIND),
'socket_out': str(SocketType.PUSH_BIND),
'port_in': self._get_random_port(),
'port_out': self._get_random_port()})

for c in last_layer.components:
c['socket_out'] = str(SocketType.PUSH_CONNECT)
c['port_out'] = r['port_in']
for c in layer.components:
c['socket_in'] = str(SocketType.PULL_CONNECT)
c['port_in'] = r['port_out']
router_layer.append(r)
router_layers.append(router_layer)

router_layers = [] # type: List['self.Layer']
# bind the last out to current in

Expand Down
4 changes: 2 additions & 2 deletions gnes/encoder/image/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ def _encode(_, img: List['np.ndarray']):
# for video
if len(img[0].shape) == 4:
padding_image, max_lenth = _padding(img)
output = _encode(None, padding_image)
output = _encode(self, padding_image)
# for image
else:
output = _encode(None, img)
output = _encode(self, img)

return output
2 changes: 1 addition & 1 deletion gnes/encoder/image/inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ def _encode(_, data):
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

return _encode(None, img).astype(np.float32)
return _encode(self, img).astype(np.float32)
4 changes: 2 additions & 2 deletions gnes/encoder/video/incep_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _encode1(_, data):
feed_dict={self.inputs: data})
return end_points_[self.select_layer]

v = [_ for vi in _encode1(None, img) for _ in vi]
v = [_ for vi in _encode1(self, img) for _ in vi]

v_input = [v[s:e] for s, e in zip(pos_start, pos_end)]
v_input = [(vi + [[0.0] * self.input_size] * (max_len - len(vi)))[:max_len] for vi in v_input]
Expand All @@ -129,4 +129,4 @@ def _encode2(_, data):
return self.sess2.run(self.mix_model.repre,
feed_dict={self.mix_model.feeds: data})

return _encode2(None, v_input).astype(np.float32)
return _encode2(self, v_input).astype(np.float32)
10 changes: 10 additions & 0 deletions tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,14 @@ def test_mini_batch(self):
def _do_mini_batch(_, y):
return y

# this will follow self.batch_size, which is None
@batching
def _do_mini_batch2(_, y):
return y

self.assertEqual(_do_mini_batch(None, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]])
self.assertEqual(_do_mini_batch2(self, x), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

self.batch_size = 4

self.assertEqual(_do_mini_batch2(self, x), [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]])
7 changes: 4 additions & 3 deletions tests/test_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ def setUp(self):
self.single_cn = '矫矫珍木巅,得无金丸惧。'
self.single_en = 'When forty winters shall besiege thy brow. And dig deep trenches in thy beautys field.'
self.dirname = os.path.dirname(__file__)
self.yaml_path = os.path.join(self.dirname, 'yaml', 'test-preprocessor.yml')

def test_preprocessor_service_empty(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
with PreprocessorService(args):
pass

def test_preprocessor_service_echo(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
'--port_out', str(args.port_in)
Expand All @@ -38,7 +39,7 @@ def test_preprocessor_service_echo(self):
print(r)

def test_preprocessor_service_realdata(self):
args = set_preprocessor_service_parser().parse_args([])
args = set_preprocessor_service_parser().parse_args(['--yaml_path', self.yaml_path])
c_args = _set_client_parser().parse_args([
'--port_in', str(args.port_out),
'--port_out', str(args.port_in)
Expand Down
Loading