Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(parser): use str instead of textio stream to prevent serializer err
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Oct 9, 2019
1 parent 6a36833 commit 14f5280
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 82 deletions.
2 changes: 1 addition & 1 deletion gnes/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _client_http(args):

def _client_cli(args):
from ..client.cli import CLIClient
CLIClient(args).start()
CLIClient(args)


def compose(args):
Expand Down
2 changes: 1 addition & 1 deletion gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def resolve_yaml_path(path):
# already a readable stream
return path
elif os.path.exists(path):
return open(path, encoding='utf8')
return path
elif path.isidentifier():
# possible class name
return io.StringIO('!%s {}' % path)
Expand Down
3 changes: 2 additions & 1 deletion gnes/encoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
'VggishEncoder': 'audio.vggish',
'YouTube8MFeatureExtractor': 'video.yt8m_feature_extractor',
'YouTube8MEncoder': 'video.yt8m_model',
'QuantizerEncoder': 'numeric.quantizer'
'QuantizerEncoder': 'numeric.quantizer',
'CharEmbeddingEncoder': 'text.char'
}

register_all_class(_cls2file_map, 'encoder')
197 changes: 123 additions & 74 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from collections import OrderedDict, defaultdict
from contextlib import ExitStack
from functools import wraps
Expand All @@ -24,6 +25,18 @@ class Service(BetterEnum):
Preprocessor = 4


class FlowImcompleteError(ValueError):
"""Exception when the flow missing some important component to run"""


class FlowTopologyError(ValueError):
"""Exception when the topology is ambiguous"""


class FlowBuildLevelMismatch(ValueError):
"""Exception when required level is higher than the current build level"""


def _build_level(required_level: 'Flow.BuildLevel'):
def __build_level(func):
@wraps(func)
Expand All @@ -32,8 +45,9 @@ def arg_wrapper(self, *args, **kwargs):
if self._build_level.value >= required_level.value:
return func(self, *args, **kwargs)
else:
raise ValueError('build_level check failed for %s, required: %s, actual: %s' % (
func.__name__, required_level, self._build_level))
raise FlowBuildLevelMismatch(
'build_level check failed for %r, required level: %s, actual level: %s' % (
func, required_level, self._build_level))
else:
raise AttributeError('%r has no attribute "_build_level"' % self)

Expand Down Expand Up @@ -68,7 +82,7 @@ def __init__(self, with_frontend: bool = True, **kwargs):
self.logger = set_logger(self.__class__.__name__)
self._service_nodes = OrderedDict()
self._service_edges = {}
self._service_name_counter = {k: 0 for k in self._service2parser.keys()}
self._service_name_counter = {k: 0 for k in Flow._service2parser.keys()}
self._service_contexts = []
self._last_add_service = None
self._common_kwargs = kwargs
Expand All @@ -77,7 +91,7 @@ def __init__(self, with_frontend: bool = True, **kwargs):
self._build_level = Flow.BuildLevel.EMPTY
self._backend = None
if with_frontend:
self.add_frontend()
self.add_frontend(copy_flow=False)
else:
self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself')

Expand Down Expand Up @@ -137,7 +151,7 @@ def query(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):

@_build_level(BuildLevel.RUNTIME)
def _call_client(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):
args, p_args = self._get_parsed_args(set_client_cli_parser, kwargs)
args, p_args = self._get_parsed_args(self, set_client_cli_parser, kwargs)
p_args.grpc_port = self._service_nodes[self._frontend]['parsed_args'].grpc_port
p_args.grpc_host = self._service_nodes[self._frontend]['parsed_args'].grpc_host
c = CLIClient(p_args, start_at_init=False)
Expand Down Expand Up @@ -172,41 +186,46 @@ def add(self, service: 'Service',
name: str = None,
service_in: Union[str, Tuple[str], List[str], 'Service'] = None,
service_out: Union[str, Tuple[str], List[str], 'Service'] = None,
copy_flow: bool = True,
**kwargs) -> 'Flow':
"""
Add a service to the current flow object
Add a service to the current flow object and return the new modified flow object
:param copy_flow: when set to true, then always copy the current flow
and do the modification on top of it then return, otherwise, do in-line modification
:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
:param service_in: the name of the service(s) that this service receives data from.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param service_out: the name of the service(s) that this service sends data to.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param kwargs: other keyword-value arguments that the service CLI supports
:return: the current flow object
:return: a (new) flow object with modification
"""

op_flow = copy.deepcopy(self) if copy_flow else self

if service not in Flow._service2parser:
raise ValueError('service: %s is not supported, should be one of %s' % (service, self._service2parser))
raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service2parser))

if name in self._service_nodes:
raise ValueError('name: %s is used in this Flow already!' % name)
if name in op_flow._service_nodes:
raise FlowTopologyError('name: %s is used in this Flow already!' % name)
if not name:
name = '%s%d' % (service, self._service_name_counter[service])
self._service_name_counter[service] += 1
name = '%s%d' % (service, op_flow._service_name_counter[service])
op_flow._service_name_counter[service] += 1
if not name.isidentifier():
raise ValueError('name: %s is invalid, please follow the python variable name conventions' % name)

if service == Service.Frontend:
if self._frontend:
raise ValueError('frontend is already in this Flow')
self._frontend = name
if op_flow._frontend:
raise FlowTopologyError('frontend is already in this Flow')
op_flow._frontend = name

service_in = self._parse_service_endpoints(name, service_in, connect_to_last_service=True)
service_out = self._parse_service_endpoints(name, service_out, connect_to_last_service=False)
service_in = op_flow._parse_service_endpoints(op_flow, name, service_in, connect_to_last_service=True)
service_out = op_flow._parse_service_endpoints(op_flow, name, service_out, connect_to_last_service=False)

args, p_args = self._get_parsed_args(Flow._service2parser[service], kwargs)
args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs)

self._service_nodes[name] = {
op_flow._service_nodes[name] = {
'service': service,
'parsed_args': p_args,
'args': args,
Expand All @@ -215,41 +234,43 @@ def add(self, service: 'Service',

# direct all income services' output to the current service
for s in service_in:
self._service_nodes[s]['outgoings'].add(name)
op_flow._service_nodes[s]['outgoings'].add(name)
for s in service_out:
self._service_nodes[s]['incomes'].add(name)
op_flow._service_nodes[s]['incomes'].add(name)

self._last_add_service = name
op_flow._last_add_service = name

# graph is now changed so we need to
# reset the build level to the lowest
self._build_level = Flow.BuildLevel.EMPTY
op_flow._build_level = Flow.BuildLevel.EMPTY

return self
return op_flow

def _parse_service_endpoints(self, cur_service_name, service_endpoint, connect_to_last_service=False):
@staticmethod
def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False):
# parsing service_in
if isinstance(service_endpoint, str):
service_endpoint = [service_endpoint]
elif service_endpoint == Service.Frontend:
service_endpoint = [self._frontend]
service_endpoint = [op_flow._frontend]
elif not service_endpoint:
if self._last_add_service and connect_to_last_service:
service_endpoint = [self._last_add_service]
if op_flow._last_add_service and connect_to_last_service:
service_endpoint = [op_flow._last_add_service]
else:
service_endpoint = []
if isinstance(service_endpoint, list) or isinstance(service_endpoint, tuple):
for s in service_endpoint:
if s == cur_service_name:
raise ValueError('the income of a service can not be itself')
if s not in self._service_nodes:
raise ValueError('service_in: %s can not be found in this Flow' % s)
raise FlowTopologyError('the income of a service can not be itself')
if s not in op_flow._service_nodes:
raise FlowTopologyError('service_in: %s can not be found in this Flow' % s)
else:
raise ValueError('service_in=%s is not parsable' % service_endpoint)
return set(service_endpoint)

def _get_parsed_args(self, service_arg_parser, kwargs):
kwargs.update(self._common_kwargs)
@staticmethod
def _get_parsed_args(op_flow, service_arg_parser, kwargs):
kwargs.update(op_flow._common_kwargs)
args = []
for k, v in kwargs.items():
if isinstance(v, bool):
Expand All @@ -268,36 +289,40 @@ def _get_parsed_args(self, service_arg_parser, kwargs):
try:
p_args, unknown_args = service_arg_parser().parse_known_args(args)
if unknown_args:
self.logger.warning('not sure what these arguments are: %s' % unknown_args)
op_flow.logger.warning('not sure what these arguments are: %s' % unknown_args)
except SystemExit:
raise ValueError('bad arguments for service "%s", '
'you may want to double check your args "%s"' % (service_arg_parser, args))
return args, p_args

def _build_graph(self) -> 'Flow':
if not self._frontend:
raise ValueError('frontend does not exist, you may need to add_frontend()')
def _build_graph(self, copy_flow: bool) -> 'Flow':
op_flow = copy.deepcopy(self) if copy_flow else self

op_flow._service_edges.clear()

if not op_flow._frontend:
raise FlowImcompleteError('frontend does not exist, you may need to add_frontend()')

if not self._last_add_service or not self._service_nodes:
raise ValueError('flow is empty?')
if not op_flow._last_add_service or not op_flow._service_nodes:
raise FlowTopologyError('flow is empty?')

# close the loop
self._service_nodes[self._frontend]['incomes'].add(self._last_add_service)
op_flow._service_nodes[op_flow._frontend]['incomes'].add(op_flow._last_add_service)

# build all edges
for k, v in self._service_nodes.items():
for k, v in op_flow._service_nodes.items():
for s in v['incomes']:
self._service_edges['%s-%s' % (s, k)] = ''
op_flow._service_edges['%s-%s' % (s, k)] = ''
for t in v['outgoings']:
self._service_edges['%s-%s' % (k, t)] = ''
op_flow._service_edges['%s-%s' % (k, t)] = ''

for k in self._service_edges.keys():
for k in op_flow._service_edges.keys():
start_node, end_node = k.split('-')
edges_with_same_start = [ed for ed in self._service_edges.keys() if ed.startswith(start_node)]
edges_with_same_end = [ed for ed in self._service_edges.keys() if ed.endswith(end_node)]
edges_with_same_start = [ed for ed in op_flow._service_edges.keys() if ed.startswith(start_node)]
edges_with_same_end = [ed for ed in op_flow._service_edges.keys() if ed.endswith(end_node)]

s_pargs = self._service_nodes[start_node]['parsed_args']
e_pargs = self._service_nodes[end_node]['parsed_args']
s_pargs = op_flow._service_nodes[start_node]['parsed_args']
e_pargs = op_flow._service_nodes[end_node]['parsed_args']

# Rule
# if a node has multiple income/outgoing services,
Expand All @@ -314,21 +339,21 @@ def _build_graph(self) -> 'Flow':
s_pargs.host_out = BaseService.default_host
e_pargs.socket_in = SocketType.SUB_CONNECT
e_pargs.host_in = start_node
self._service_edges[k] = 'PUB-sub'
op_flow._service_edges[k] = 'PUB-sub'
elif len(edges_with_same_end) > 1 and len(edges_with_same_start) == 1:
s_pargs.socket_out = SocketType.PUSH_CONNECT
s_pargs.host_out = end_node
e_pargs.socket_in = SocketType.PULL_BIND
e_pargs.host_in = BaseService.default_host
self._service_edges[k] = 'push-PULL'
op_flow._service_edges[k] = 'push-PULL'
elif len(edges_with_same_start) == 1 and len(edges_with_same_end) == 1:
# in this case, either side can be BIND
# we prefer frontend to be always BIND
# check if either node is frontend
if start_node == self._frontend:
if start_node == op_flow._frontend:
s_pargs.socket_out = SocketType.PUSH_BIND
e_pargs.socket_in = SocketType.PULL_CONNECT
elif end_node == self._frontend:
elif end_node == op_flow._frontend:
s_pargs.socket_out = SocketType.PUSH_CONNECT
e_pargs.socket_in = SocketType.PULL_BIND
else:
Expand All @@ -337,41 +362,49 @@ def _build_graph(self) -> 'Flow':
if s_pargs.socket_out.is_bind:
s_pargs.host_out = BaseService.default_host
e_pargs.host_in = start_node
self._service_edges[k] = 'PUSH-pull'
op_flow._service_edges[k] = 'PUSH-pull'
elif e_pargs.socket_in.is_bind:
s_pargs.host_out = end_node
e_pargs.host_in = BaseService.default_host
self._service_edges[k] = 'push-PULL'
op_flow._service_edges[k] = 'push-PULL'
else:
raise ValueError('edge %s -> %s is ambiguous, at least one socket should be BIND')
raise FlowTopologyError('edge %s -> %s is ambiguous, at least one socket should be BIND')
else:
raise ValueError('found %d edges start with %s and %d edges end with %s, '
'this type of topology is ambiguous and should not exist, i can not determine the socket type' % (
len(edges_with_same_start), start_node, len(edges_with_same_end), end_node))
raise FlowTopologyError('found %d edges start with %s and %d edges end with %s, '
'this type of topology is ambiguous and should not exist, '
'i can not determine the socket type' % (
len(edges_with_same_start), start_node, len(edges_with_same_end), end_node))

self._build_level = Flow.BuildLevel.GRAPH
return self
op_flow._build_level = Flow.BuildLevel.GRAPH
return op_flow

def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *args, **kwargs) -> 'Flow':
"""
Build the current flow and make it ready to use
:param backend: supported 'thread', 'process', 'swarm', 'k8s', 'shell'
:param copy_flow: return the copy of the current flow
:return: the current flow (by default)
"""

op_flow = self._build_graph(copy_flow)

def build(self, backend: Optional[str] = 'thread', *args, **kwargs) -> 'Flow':
self._build_graph()
if not backend:
self.logger.warning('no specified backend, build_level stays at %s, '
'and you can not run this flow.' % self._build_level)
op_flow.logger.warning('no specified backend, build_level stays at %s, '
'and you can not run this flow.' % op_flow._build_level)
elif backend in {'thread', 'process'}:
self._service_contexts.clear()
for v in self._service_nodes.values():
op_flow._service_contexts.clear()
for v in op_flow._service_nodes.values():
p_args = v['parsed_args']
p_args.parallel_backend = backend
# for thread and process backend which runs locally, host_in and host_out should not be set
p_args.host_in = BaseService.default_host
p_args.host_out = BaseService.default_host
s = self._service2builder[v['service']](p_args)
self._service_contexts.append(s)
self._build_level = Flow.BuildLevel.RUNTIME
op_flow._service_contexts.append((Flow._service2builder[v['service']], p_args))
op_flow._build_level = Flow.BuildLevel.RUNTIME
else:
raise NotImplementedError('backend=%s is not supported yet' % backend)

return self
return op_flow

def __call__(self, *args, **kwargs):
return self.build(*args, **kwargs)
Expand All @@ -381,13 +414,29 @@ def __enter__(self):
self.logger.warning(
'current build_level=%s, lower than required. '
'build the flow now via build() with default parameters' % self._build_level)
self.build()
self.build(copy_flow=False)
self._service_stack = ExitStack()
for k in self._service_contexts:
self._service_stack.enter_context(k)
for k, v in self._service_contexts:
self._service_stack.enter_context(k(v))

self.logger.critical('flow is built and ready, current build level is %s' % self._build_level)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
if hasattr(self, '_service_stack'):
self._service_stack.close()
self._build_level = Flow.BuildLevel.GRAPH
self._build_level = Flow.BuildLevel.EMPTY
self.logger.critical(
'flow is closed and all resources should be released already, current build level is %s' % self._build_level)

def __getstate__(self):
d = dict(self.__dict__)
del d['logger']
return d

def __setstate__(self, d):
self.__dict__.update(d)
self.logger = set_logger(self.__class__.__name__)
Loading

0 comments on commit 14f5280

Please sign in to comment.