Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #90 from gnes-ai/fix-composer-3
Browse files Browse the repository at this point in the history
feat(grpc): add a general purpose grpc service
  • Loading branch information
mergify[bot] authored Aug 9, 2019
2 parents edea3ed + 2dd9d4c commit 1c1e07e
Show file tree
Hide file tree
Showing 31 changed files with 969 additions and 195 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
<a href="#documentation">Documentation</a> •
<a href="#tutorial">Tutorial</a> •
<a href="#contributing">Contributing</a> •
<a href="./CHANGELOG.md">Release Notes</a>
<a href="./CHANGELOG.md">Release Notes</a>
<a href="https://hanxiao.github.io/2019/07/29/Generic-Neural-Elastic-Search-From-bert-as-service-and-Go-Way-Beyond/">Blog</a>
</p>

<h2 align="center">What is it</h2>
Expand Down
31 changes: 17 additions & 14 deletions gnes/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,41 @@
# limitations under the License.


def preprocess(args):
def _start_service(cls, args):
from ..service.base import ServiceManager
from ..service.preprocessor import PreprocessorService
with ServiceManager(PreprocessorService, args) as es:
with ServiceManager(cls, args) as es:
es.join()


def grpc(args):
from ..service.grpc import GRPCService
_start_service(GRPCService, args)


def preprocess(args):
from ..service.preprocessor import PreprocessorService
_start_service(PreprocessorService, args)


def encode(args):
from ..service.base import ServiceManager
from ..service.encoder import EncoderService
with ServiceManager(EncoderService, args) as es:
es.join()
_start_service(EncoderService, args)


def index(args):
from ..service.base import ServiceManager
from ..service.indexer import IndexerService
with ServiceManager(IndexerService, args) as es:
es.join()
_start_service(IndexerService, args)


def route(args):
from ..service.base import ServiceManager
from ..service.router import RouterService
with ServiceManager(RouterService, args) as es:
es.join()
_start_service(RouterService, args)


def frontend(args):
from ..service.grpc import GRPCFrontend
from gnes.service.frontend import FrontendService
import threading
with GRPCFrontend(args):
with FrontendService(args):
forever = threading.Event()
forever.wait()

Expand Down
40 changes: 34 additions & 6 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,36 @@ def _set_grpc_parser(parser=None):
type=int,
default=8800,
help='host port of the grpc service')
parser.add_argument('--max_message_size', type=int, default=100,
help='maximum send and receive size for grpc server in (MB)')
return parser


def set_grpc_frontend_parser(parser=None):
def set_grpc_service_parser(parser=None):
if not parser:
parser = set_base_parser()
set_service_parser(parser)
_set_grpc_parser(parser)
parser.add_argument('--pb2_path',
type=str,
required=True,
help='the path of the python file protocol buffer compiler')
parser.add_argument('--pb2_grpc_path',
type=str,
required=True,
help='the path of the python file generated by the gRPC Python protocol compiler plugin')
parser.add_argument('--stub_name',
type=str,
required=True,
help='the name of the gRPC Stub')
parser.add_argument('--api_name',
type=str,
required=True,
help='the api name for calling the stub')
return parser


def set_frontend_parser(parser=None):
from ..service.base import SocketType
if not parser:
parser = set_base_parser()
Expand All @@ -231,10 +257,6 @@ def set_grpc_frontend_parser(parser=None):
read_only=True)
parser.add_argument('--max_concurrency', type=int, default=10,
help='maximum concurrent client allowed')
parser.add_argument('--max_send_size', type=int, default=100,
help='maximum send size for grpc server in (MB)')
parser.add_argument('--max_receive_size', type=int, default=100,
help='maximum receive size for grpc server in (MB)')
return parser


Expand Down Expand Up @@ -288,12 +310,18 @@ def get_main_parser():
description='use "gnes [sub-command] --help" '
'to get detailed information about each sub-command')

set_grpc_frontend_parser(sp.add_parser('frontend', help='start a grpc frontend service'))
# microservices
set_frontend_parser(sp.add_parser('frontend', help='start a frontend service'))
set_indexer_service_parser(sp.add_parser('index', help='start an indexer service'))
set_loadable_service_parser(sp.add_parser('encode', help='start an encoder service'))
set_router_service_parser(sp.add_parser('route', help='start a router service'))
set_preprocessor_service_parser(sp.add_parser('preprocess', help='start a preprocessor service'))
set_grpc_service_parser(sp.add_parser('grpc', help='start a general purpose grpc service'))

# clients
set_http_service_parser(sp.add_parser('client_http', help='start a http service'))
set_cli_client_parser(sp.add_parser('client_cli', help='start a grpc client'))

# others
set_composer_flask_parser(sp.add_parser('compose', help='start a GNES Board and visualize YAML config'))
return parser
53 changes: 53 additions & 0 deletions gnes/client/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Tencent is pleased to support the open source community by making GNES available.
#
# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import zmq

from ..helper import set_logger
from ..proto import send_message, gnes_pb2, recv_message
from ..service.base import build_socket


class ZmqClient:

def __init__(self, args):
self.args = args
self.identity = args.identity if 'identity' in args else None
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.ctx = zmq.Context()
self.ctx.setsockopt(zmq.LINGER, 0)
self.receiver, recv_addr = build_socket(self.ctx, self.args.host_in, self.args.port_in, self.args.socket_in,
getattr(self, 'identity', None))
self.sender, send_addr = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out,
getattr(self, 'identity', None))
self.logger.info('send via %s, receive via %s' % (send_addr, recv_addr))

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
self.sender.close()
self.receiver.close()
self.ctx.term()

def send_message(self, message: "gnes_pb2.Message", timeout: int = -1):
send_message(self.sender, message, timeout=timeout)

def recv_message(self, timeout: int = -1) -> gnes_pb2.Message:
return recv_message(self.receiver, timeout=timeout)
4 changes: 2 additions & 2 deletions gnes/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def __init__(self, args):

with grpc.insecure_channel(
'%s:%s' % (args.grpc_host, args.grpc_port),
options=[('grpc.max_send_message_length', 70 * 1024 * 1024),
('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel:
options=[('grpc.max_send_message_length', args.max_message_size * 1024 * 1024),
('grpc.max_receive_message_length', args.max_message_size * 1024 * 1024)]) as channel:
stub = gnes_pb2_grpc.GnesRPCStub(channel)

if args.mode == 'train':
Expand Down
4 changes: 2 additions & 2 deletions gnes/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def stub_call(req):

with grpc.insecure_channel(
'%s:%s' % (self.args.grpc_host, self.args.grpc_port),
options=[('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
options=[('grpc.max_send_message_length', self.args.max_message_size * 1024 * 1024),
('grpc.max_receive_message_length', self.args.max_message_size * 1024 * 1024),
('grpc.keepalive_timeout_ms', 100 * 1000)]) as channel:
stub = gnes_pb2_grpc.GnesRPCStub(channel)
loop.run_until_complete(init(loop))
Expand Down
16 changes: 8 additions & 8 deletions gnes/composer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ruamel.yaml.comments import CommentedMap

from .. import __version__
from ..cli.parser import set_grpc_frontend_parser, \
from ..cli.parser import set_frontend_parser, \
set_router_service_parser, set_loadable_service_parser, set_preprocessor_service_parser, \
set_indexer_service_parser
from ..helper import set_logger
Expand All @@ -38,15 +38,15 @@ class YamlComposer:
'Encoder': 'encode',
'Router': 'route',
'Indexer': 'index',
'gRPCFrontend': 'frontend',
'Frontend': 'frontend',
'Preprocessor': 'preprocess'
}

comp2args = {
'Encoder': set_loadable_service_parser().parse_args(['--yaml_path', 'BaseEncoder']),
'Router': set_router_service_parser().parse_args(['--yaml_path', 'BaseRouter']),
'Indexer': set_indexer_service_parser().parse_args(['--yaml_path', 'BaseIndexer']),
'gRPCFrontend': set_grpc_frontend_parser().parse_args([]),
'Frontend': set_frontend_parser().parse_args([]),
'Preprocessor': set_preprocessor_service_parser().parse_args(['--yaml_path', 'BasePreprocessor'])
}

Expand Down Expand Up @@ -115,7 +115,7 @@ def __init__(self, args):

if 'services' in tmp:
self.add_layer()
self.add_comp(CommentedMap({'name': 'gRPCFrontend', 'grpc_port': self._port}))
self.add_comp(CommentedMap({'name': 'Frontend', 'grpc_port': self._port}))
for comp in tmp['services']:
self.add_layer()
if isinstance(comp, list):
Expand Down Expand Up @@ -158,9 +158,9 @@ def build_layers(self) -> List['YamlComposer.Layer']:
all_layers.append(copy.deepcopy(l))
all_layers[0] = copy.deepcopy(self._layers[0])

# gRPCfrontend should always on the bind role
# Frontend should always on the bind role
assert all_layers[0].is_single_component
assert all_layers[0].components[0]['name'] == 'gRPCFrontend'
assert all_layers[0].components[0]['name'] == 'Frontend'

if all_layers[0].components[0]['socket_in'] == str(SocketType.SUB_CONNECT):
# change to sub bind
Expand Down Expand Up @@ -244,7 +244,7 @@ def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str =
and (c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml')):
swarm_lines['services'][c_name]['configs'] = ['%s_yaml' % c_name]

if c['name'] == 'gRPCFrontend':
if c['name'] == 'Frontend':
swarm_lines['services'][c_name]['ports'] = ['%d:%d' % (c['grpc_port'], c['grpc_port'])]

if volumes:
Expand Down Expand Up @@ -312,7 +312,7 @@ def build_mermaid(all_layers: List['YamlComposer.Layer'], mermaid_leftright: boo
# if len(last_layer.components) > 1:
# self.mermaid_graph.append('\tend')

style = ['classDef gRPCFrontendCLS fill:#FFE0E0,stroke:#FFE0E0,stroke-width:1px;',
style = ['classDef FrontendCLS fill:#FFE0E0,stroke:#FFE0E0,stroke-width:1px;',
'classDef EncoderCLS fill:#FFDAAF,stroke:#FFDAAF,stroke-width:1px;',
'classDef IndexerCLS fill:#FFFBC1,stroke:#FFFBC1,stroke-width:1px;',
'classDef RouterCLS fill:#C9E8D2,stroke:#C9E8D2,stroke-width:1px;',
Expand Down
2 changes: 0 additions & 2 deletions gnes/encoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# limitations under the License.




# A key-value map for Class to the (module)file it located in
from ..base import register_all_class

Expand Down
2 changes: 0 additions & 2 deletions gnes/encoder/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# limitations under the License.




from typing import List, Any

import numpy as np
Expand Down
1 change: 0 additions & 1 deletion gnes/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.



# A key-value map for Class to the (module)file it located in
from ..base import register_all_class

Expand Down
1 change: 0 additions & 1 deletion gnes/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.



from typing import List, Any, Union, Callable, Tuple

import numpy as np
Expand Down
1 change: 1 addition & 0 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def build_socket(ctx: 'zmq.Context', host: str, port: int, socket_type: 'SocketT
}[socket_type]()

if socket_type.is_bind:
host = BaseService.default_host
if port is None:
sock.bind_to_random_port('tcp://%s' % host)
else:
Expand Down
Loading

0 comments on commit 1c1e07e

Please sign in to comment.