Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #311 from gnes-ai/feat-flow
Browse files Browse the repository at this point in the history
feat(flow): first version of gnes flow
  • Loading branch information
Han Xiao authored Oct 9, 2019
2 parents 12f7b70 + c5af930 commit 3aab341
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 36 deletions.
5 changes: 1 addition & 4 deletions gnes/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ def route(args):

def frontend(args):
from ..service.frontend import FrontendService
import threading
with FrontendService(args):
forever = threading.Event()
forever.wait()
_start_service(FrontendService, args)


def client(args):
Expand Down
36 changes: 25 additions & 11 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,27 @@ def resolve_py_path(path):
return path


def resolve_yaml_path(path):
def random_port(port):
if not port or int(port) <= 0:
import random
min_port, max_port = 49152, 65536
return random.randrange(min_port, max_port)
else:
return int(port)


def resolve_yaml_path(path, to_stream=False):
# priority, filepath > classname > default
import os
import io
if hasattr(path, 'read'):
# already a readable stream
return path
elif os.path.exists(path):
return open(path, encoding='utf8')
if to_stream:
return open(path, encoding='utf8')
else:
return path
elif path.isidentifier():
# possible class name
return io.StringIO('!%s {}' % path)
Expand All @@ -68,8 +80,9 @@ def resolve_yaml_path(path):


def set_base_parser():
from .. import __version__
from .. import __version__, __proto_version__
from termcolor import colored
import os
# create the top-level parser
parser = argparse.ArgumentParser(
description='%s, a cloud-native semantic search system '
Expand All @@ -79,7 +92,9 @@ def set_base_parser():
colored('GNES v%s: Generic Neural Elastic Search' % __version__, 'green'),
colored('https://gnes.ai', 'cyan', attrs=['underline'])),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + __version__)
parser.add_argument('-v', '--version', action='version',
version='%(prog)s' + ': %s\nprotobuf: %s\nvcs_version: %s' %
(__version__, __proto_version__, os.environ.get('GNES_VCS_VERSION', 'unknown')))
parser.add_argument('--verbose', action='store_true', default=False,
help='turn on detailed logging for debug')
return parser
Expand All @@ -98,7 +113,7 @@ def set_composer_parser(parser=None):
type=str,
default='GNES app',
help='name of the instance')
parser.add_argument('--yaml_path', type=resolve_yaml_path,
parser.add_argument('--yaml_path', type=lambda x: resolve_yaml_path(x, True),
default=resource_stream(
'gnes', '/'.join(('resources', 'compose', 'gnes-example.yml'))),
help='yaml config of the service')
Expand Down Expand Up @@ -139,14 +154,14 @@ def set_composer_flask_parser(parser=None):

def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService, ParallelType
import random

import os
if not parser:
parser = set_base_parser()
min_port, max_port = 49152, 65536
parser.add_argument('--port_in', type=int, default=random.randrange(min_port, max_port),

parser.add_argument('--port_in', type=int, default=random_port(-1),
help='port for input data, default a random port between [49152, 65536]')
parser.add_argument('--port_out', type=int, default=random.randrange(min_port, max_port),
parser.add_argument('--port_out', type=int, default=random_port(-1),
help='port for output data, default a random port between [49152, 65536]')
parser.add_argument('--host_in', type=str, default=BaseService.default_host,
help='host address for input')
Expand All @@ -158,8 +173,7 @@ def set_service_parser(parser=None):
parser.add_argument('--socket_out', type=SocketType.from_string, choices=list(SocketType),
default=SocketType.PUSH_BIND,
help='socket type for output port')
parser.add_argument('--port_ctrl', type=int,
default=int(os.environ.get('GNES_CONTROL_PORT', random.randrange(min_port, max_port))),
parser.add_argument('--port_ctrl', type=int, default=os.environ.get('GNES_CONTROL_PORT', random_port(-1)),
help='port for controlling the service, default a random port between [49152, 65536]')
parser.add_argument('--timeout', type=int, default=-1,
help='timeout (ms) of all communication, -1 for waiting forever')
Expand Down
6 changes: 3 additions & 3 deletions gnes/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ def __init__(self, args):
)
self.logger.info('waiting channel to be ready...')
grpc.channel_ready_future(self._channel).result()
self.logger.critical('gnes client ready!')

# create new stub
self.logger.info('create new stub...')
self._stub = gnes_pb2_grpc.GnesRPCStub(self._channel)

# attache response handler
self.handler._context = self
self.logger.critical('gnes client ready at %s:%d!' % (self.args.grpc_host, self.args.grpc_port))

def call(self, request):
resp = self._stub.call(request)
Expand All @@ -158,13 +158,13 @@ def _handler_response_default(self, msg: 'gnes_pb2.Response'):
pass

def __enter__(self):
self.open()
self.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def open(self):
def start(self):
pass

def close(self):
Expand Down
47 changes: 34 additions & 13 deletions gnes/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,33 @@


class CLIClient(GrpcClient):
def __init__(self, args):
def __init__(self, args, start_at_init: bool = True):
super().__init__(args)
getattr(self, self.args.mode)()
self.close()
self._bytes_generator = self._get_bytes_generator_from_args(args)
if start_at_init:
self.start()

@staticmethod
def _get_bytes_generator_from_args(args):
if args.txt_file:
all_bytes = (v.encode() for v in args.txt_file)
elif args.image_zip_file:
zipfile_ = zipfile.ZipFile(args.image_zip_file)
all_bytes = (zipfile_.open(v).read() for v in zipfile_.namelist())
elif args.video_zip_file:
zipfile_ = zipfile.ZipFile(args.video_zip_file)
all_bytes = (zipfile_.open(v).read() for v in zipfile_.namelist())
else:
all_bytes = None
return all_bytes

def start(self):
try:
getattr(self, self.args.mode)()
except Exception as ex:
self.logger.error(ex)
finally:
self.close()

def train(self):
with ProgressBar(task_name=self.args.mode) as p_bar:
Expand Down Expand Up @@ -64,18 +87,16 @@ def query_callback(self, req: 'gnes_pb2.Request', resp: 'gnes_pb2.Response'):

@property
def bytes_generator(self) -> Generator[bytes, None, None]:
if self.args.txt_file:
all_bytes = (v.encode() for v in self.args.txt_file)
elif self.args.image_zip_file:
zipfile_ = zipfile.ZipFile(self.args.image_zip_file)
all_bytes = (zipfile_.open(v).read() for v in zipfile_.namelist())
elif self.args.video_zip_file:
zipfile_ = zipfile.ZipFile(self.args.video_zip_file)
all_bytes = (zipfile_.open(v).read() for v in zipfile_.namelist())
if self._bytes_generator:
return self._bytes_generator
else:
raise AttributeError('--txt_file, --image_zip_file, --video_zip_file one must be given')
raise ValueError('bytes_generator is empty or not set')

return all_bytes
@bytes_generator.setter
def bytes_generator(self, bytes_gen: Generator[bytes, None, None]):
if self._bytes_generator:
self.logger.warning('bytes_generator is not empty, overrided')
self._bytes_generator = bytes_gen


class ProgressBar:
Expand Down
3 changes: 3 additions & 0 deletions gnes/composer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# COMPOSER WILL BE RETIRED IN THE FUTURE VERSION!!!
# COMPOSER WILL BE RETIRED IN THE FUTURE VERSION!!!
# COMPOSER WILL BE RETIRED IN THE FUTURE VERSION!!!
3 changes: 2 additions & 1 deletion gnes/encoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
'VggishEncoder': 'audio.vggish',
'YouTube8MFeatureExtractor': 'video.yt8m_feature_extractor',
'YouTube8MEncoder': 'video.yt8m_model',
'QuantizerEncoder': 'numeric.quantizer'
'QuantizerEncoder': 'numeric.quantizer',
'CharEmbeddingEncoder': 'text.char'
}

register_all_class(_cls2file_map, 'encoder')
45 changes: 45 additions & 0 deletions gnes/encoder/text/char.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Tencent is pleased to support the open source community by making GNES available.
#
# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import List

import numpy as np

from ..base import BaseTextEncoder
from ...helper import batching, as_numpy_array


class CharEmbeddingEncoder(BaseTextEncoder):
"""A random character embedding model. Only useful for testing"""
is_trained = True

def __init__(self, dim: int = 128, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dim = dim
self.offset = 32
self.unknown_idx = 96
# in total 96 printable chars and 2 special chars = 98
self._char_embedding = np.random.random([97, dim])

@batching
@as_numpy_array
def encode(self, text: List[str], *args, **kwargs) -> List[np.ndarray]:
# tokenize text
sent_embed = []
for sent in text:
ids = [ord(c) - 32 if 32 <= ord(c) <= 127 else self.unknown_idx for c in sent]
sent_embed.append(np.mean(self._char_embedding[ids], axis=0))
return sent_embed
Loading

0 comments on commit 3aab341

Please sign in to comment.