Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' into score_fn_
Browse files Browse the repository at this point in the history
  • Loading branch information
jemmyshin authored Sep 12, 2019
2 parents 0b22029 + 79d32db commit 441e910
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 106 deletions.
2 changes: 1 addition & 1 deletion gnes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

# do not change this line manually
# this is managed by shell/make-proto.sh and updated on every execution
__proto_version__ = '0.0.6'
__proto_version__ = '0.0.8'
58 changes: 45 additions & 13 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@
import argparse


class ActionNoYes(argparse.Action):
def __init__(self, option_strings, dest, default=None, required=False, help=None):

if default is None:
raise ValueError('you must provide a default with yes/no action')
if len(option_strings) != 1:
raise ValueError('only single argument is allowed with yes/no action')
opt = option_strings[0]
if not opt.startswith('--'):
raise ValueError('yes/no arguments must be prefixed with --')

opt = opt[2:]
opts = ['--' + opt, '--no-' + opt]
super(ActionNoYes, self).__init__(opts, dest, nargs=0, const=None,
default=default, required=required, help=help)

def __call__(self, parser, namespace, values, option_strings=None):
if option_strings.startswith('--no-'):
setattr(namespace, self.dest, False)
else:
setattr(namespace, self.dest, True)


def resolve_py_path(path):
import os
if not os.path.exists(path):
Expand Down Expand Up @@ -54,7 +77,8 @@ def set_base_parser():
'It enables large-scale index and semantic search for text-to-text, image-to-image, '
'video-to-video and any content form. Visit %s for tutorials and documentations.' % (
colored('GNES v%s: Generic Neural Elastic Search' % __version__, 'green'),
colored('https://gnes.ai', 'cyan', attrs=['underline'])))
colored('https://gnes.ai', 'cyan', attrs=['underline'])),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + __version__)
parser.add_argument('--verbose', action='store_true', default=False,
help='turn on detailed logging for debug')
Expand Down Expand Up @@ -116,6 +140,7 @@ def set_composer_flask_parser(parser=None):
def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService, ParallelType
import random
import uuid
if not parser:
parser = set_base_parser()
min_port, max_port = 49152, 65536
Expand Down Expand Up @@ -151,9 +176,11 @@ def set_service_parser(parser=None):
parser.add_argument('--parallel_type', type=ParallelType.from_string, choices=list(ParallelType),
default=ParallelType.PUSH_NONBLOCK,
help='parallel type of the concurrent services')
parser.add_argument('--check_version', action='store_true', default=False,
parser.add_argument('--check_version', action=ActionNoYes, default=True,
help='comparing the GNES and proto version of incoming message with local setup, '
'mismatch raise an exception')
parser.add_argument('--identity', type=str, default=str(uuid.uuid4()).split('-')[0],
help='identity of the service, by default a random uuid string')
return parser


Expand Down Expand Up @@ -280,7 +307,7 @@ def set_frontend_parser(parser=None):
read_only=True)
parser.add_argument('--max_concurrency', type=int, default=10,
help='maximum concurrent connections allowed')
parser.add_argument('--show_route_table', action='store_true', default=False,
parser.add_argument('--route_table', action=ActionNoYes, default=True,
help='showing a route table with time cost after receiving the result')
return parser

Expand Down Expand Up @@ -352,27 +379,32 @@ def set_client_http_parser(parser=None):
def get_main_parser():
# create the top-level parser
parser = set_base_parser()
adf = argparse.ArgumentDefaultsHelpFormatter
sp = parser.add_subparsers(dest='cli', title='GNES sub-commands',
description='use "gnes [sub-command] --help" '
'to get detailed information about each sub-command')

# microservices
set_frontend_parser(sp.add_parser('frontend', help='start a frontend service'))
set_encoder_parser(sp.add_parser('encode', help='start an encoder service'))
set_indexer_parser(sp.add_parser('index', help='start an indexer service'))
set_router_parser(sp.add_parser('route', help='start a router service'))
set_preprocessor_parser(sp.add_parser('preprocess', help='start a preprocessor service'))
set_grpc_service_parser(sp.add_parser('grpc', help='start a general purpose grpc service'))
set_frontend_parser(sp.add_parser('frontend', help='start a frontend service', formatter_class=adf))
set_encoder_parser(sp.add_parser('encode', help='start an encoder service', formatter_class=adf))
set_indexer_parser(sp.add_parser('index', help='start an indexer service', formatter_class=adf))
set_router_parser(sp.add_parser('route', help='start a router service', formatter_class=adf))
set_preprocessor_parser(sp.add_parser('preprocess', help='start a preprocessor service', formatter_class=adf))
set_grpc_service_parser(sp.add_parser('grpc', help='start a general purpose grpc service', formatter_class=adf))

pp = sp.add_parser('client', help='start a GNES client of the selected type')
spp = pp.add_subparsers(dest='client', title='GNES client sub-commands',
description='use "gnes client [sub-command] --help" '
'to get detailed information about each client sub-command')
spp.required = True
# clients
set_client_http_parser(spp.add_parser('http', help='start a client that allows HTTP requests as input'))
set_client_cli_parser(spp.add_parser('cli', help='start a client that allows stdin as input'))
set_client_benchmark_parser(spp.add_parser('benchmark', help='start a client for benchmark and unittest'))
set_client_http_parser(
spp.add_parser('http', help='start a client that allows HTTP requests as input', formatter_class=adf))
set_client_cli_parser(spp.add_parser('cli', help='start a client that allows stdin as input', formatter_class=adf))
set_client_benchmark_parser(
spp.add_parser('benchmark', help='start a client for benchmark and unittest', formatter_class=adf))

# others
set_composer_flask_parser(sp.add_parser('compose', help='start a GNES Board to visualize YAML configs'))
set_composer_flask_parser(
sp.add_parser('compose', help='start a GNES Board to visualize YAML configs', formatter_class=adf))
return parser
1 change: 0 additions & 1 deletion gnes/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class ZmqClient:

def __init__(self, args):
self.args = args
self.identity = args.identity if 'identity' in args else None
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.ctx = zmq.Context()
self.ctx.setsockopt(zmq.LINGER, 0)
Expand Down
59 changes: 30 additions & 29 deletions gnes/proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from termcolor import colored

from . import gnes_pb2
from ..helper import batch_iterator
from ..helper import batch_iterator, default_logger

__all__ = ['RequestGenerator', 'send_message', 'recv_message', 'blob2array', 'array2blob', 'gnes_pb2', 'add_route']

Expand Down Expand Up @@ -103,33 +103,22 @@ def array2blob(x: np.ndarray) -> 'gnes_pb2.NdArray':


def router2str(m: 'gnes_pb2.Message') -> str:
route_str = []
for r in m.envelope.routes:
if r.num_replicas and r.num_replicas > 1:
route_str.append('%s%s' % (r.service, colored(' x%d' % r.num_replicas, 'yellow')))
else:
route_str.append(r.service)

route_str = [r.service for r in m.envelope.routes]
return colored('▸', 'green').join(route_str)


def add_route(evlp: 'gnes_pb2.Envelope', name: str):
def add_route(evlp: 'gnes_pb2.Envelope', name: str, identity: str):
r = evlp.routes.add()
r.service = name
r.start_time.GetCurrentTime()
r.service_identity = identity


def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message'], idx: int = -1):
r = msg.envelope.routes[idx]
if len(msg.envelope.routes) > 1:
msg.envelope.routes[idx - 1].service = '[%s]' % ', '.join([r.service for r in msg.envelope.routes])
r.num_replicas = len(prev_msgs)
r.first_start_time.CopyFrom(
sorted((m.envelope.routes[idx].start_time for m in prev_msgs),
key=lambda x: (x.seconds, x.nanos))[0])
r.last_end_time.CopyFrom(
sorted((m.envelope.routes[idx].end_time for m in prev_msgs),
key=lambda x: (x.seconds, x.nanos), reverse=True)[0])
def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message']):
# take unique routes by service identity
routes = {r.service_identity: r for m in prev_msgs for r in m.envelope.routes}
msg.envelope.ClearField('routes')
msg.envelope.routes.extend(sorted(routes.values(), key=lambda x: (x.start_time.seconds, x.start_time.nanos)))


def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1) -> None:
Expand Down Expand Up @@ -163,18 +152,30 @@ def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = Fa

if check_version and msg.envelope:
from .. import __version__, __proto_version__
if hasattr(msg.envelope, 'gnes_version') and __version__ != msg.envelope.gnes_version:
raise AttributeError('mismatched GNES version! '
'incoming message has GNES version %s, whereas local GNES version %s' % (
msg.envelope.gnes_version, __version__))
if hasattr(msg.envelope, 'proto_version') and __proto_version__ != msg.envelope.proto_version:
raise AttributeError('mismatched protobuf version! '
'incoming message has protobuf version %s, whereas local protobuf version %s' % (
msg.envelope.proto_version, __proto_version__))
if hasattr(msg.envelope, 'gnes_version'):
if not msg.envelope.gnes_version:
# only happen in unittest
default_logger.warning('incoming message contains empty "gnes_version", '
'you may ignore it in debug/unittest mode. '
'otherwise please check if frontend service set correct version')
elif __version__ != msg.envelope.gnes_version:
raise AttributeError('mismatched GNES version! '
'incoming message has GNES version %s, whereas local GNES version %s' % (
msg.envelope.gnes_version, __version__))
if hasattr(msg.envelope, 'proto_version'):
if not msg.envelope.proto_version:
# only happen in unittest
default_logger.warning('incoming message contains empty "proto_version", '
'you may ignore it in debug/unittest mode. '
'otherwise please check if frontend service set correct version')
elif __proto_version__ != msg.envelope.proto_version:
raise AttributeError('mismatched protobuf version! '
'incoming message has protobuf version %s, whereas local protobuf version %s' % (
msg.envelope.proto_version, __proto_version__))
if not hasattr(msg.envelope, 'proto_version') and not hasattr(msg.envelope, 'gnes_version'):
raise AttributeError('version_check=True locally, '
'but incoming message contains no version info in its envelope. '
'the message is probably sent from an outdated GNES service')
'the message is probably sent from a very outdated GNES version')
return msg

except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion gnes/proto/gnes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ message Envelope {
google.protobuf.Timestamp end_time = 3;
google.protobuf.Timestamp first_start_time = 4;
google.protobuf.Timestamp last_end_time = 5;
uint32 num_replicas = 6;
string service_identity = 6;
}
repeated route routes = 6;

Expand Down
Loading

0 comments on commit 441e910

Please sign in to comment.