From a1a2b020ccd99f3e80d0adaad9e8c68c1220d592 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Wed, 24 Jul 2019 19:15:03 +0800 Subject: [PATCH] refactor(proto): refactor request stream call --- gnes/cli/parser.py | 3 +++ gnes/client/cli.py | 12 +++++----- gnes/client/http.py | 4 +--- gnes/proto/__init__.py | 14 +++++++++--- gnes/proto/gnes.proto | 6 ++--- gnes/proto/gnes_pb2.py | 21 +++++------------ gnes/proto/gnes_pb2_grpc.py | 37 +++++++++--------------------- gnes/service/grpc.py | 31 ++++++++++++------------- shell/make-proto.sh | 4 ++-- tests/test_stream_grpc.py | 45 +++++++++++++++++++++++++++++++++++++ 10 files changed, 99 insertions(+), 78 deletions(-) create mode 100644 tests/test_stream_grpc.py diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index b6aea372..ab08d760 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -206,10 +206,13 @@ def _set_grpc_parser(parser=None): def set_grpc_frontend_parser(parser=None): + from ..service.base import SocketType if not parser: parser = set_base_parser() _set_client_parser(parser) _set_grpc_parser(parser) + parser.set_defaults(socket_in=SocketType.PULL_BIND, + socket_out=SocketType.PUSH_BIND) parser.add_argument('--max_concurrency', type=int, default=10, help='maximum concurrent client allowed') parser.add_argument('--max_send_size', type=int, default=100, diff --git a/gnes/client/cli.py b/gnes/client/cli.py index 3a90b4aa..2b6a8b34 100644 --- a/gnes/client/cli.py +++ b/gnes/client/cli.py @@ -41,17 +41,15 @@ def __init__(self, args): stub = gnes_pb2_grpc.GnesRPCStub(channel) if args.mode == 'train': - for req in RequestGenerator.train(all_bytes, args.batch_size): - resp = stub._Call(req) - print(resp) + resp = stub.RequestStreamCall(RequestGenerator.train(all_bytes, args.batch_size)) + print(resp) elif args.mode == 'index': - for req in RequestGenerator.index(all_bytes, args.batch_size): - resp = stub._Call(req) - print(resp) + resp = stub.RequestStreamCall(RequestGenerator.train(all_bytes, args.batch_size)) + print(resp) elif args.mode == 'query': for idx, q in enumerate(all_bytes): for req in RequestGenerator.query(q, args.top_k): - resp = stub._Call(req) + resp = stub.Call(req) print(resp) print('query %d result: %s' % (idx, resp)) input('press any key to continue...') diff --git a/gnes/client/http.py b/gnes/client/http.py index fc1027cb..a66c05a0 100644 --- a/gnes/client/http.py +++ b/gnes/client/http.py @@ -91,9 +91,7 @@ async def init(loop): return srv def stub_call(req): - res_f = None - for r in req: - res_f = stub._Call(r) + res_f = stub.RequestStreamCall(req) return json.loads(MessageToJson(res_f)) with grpc.insecure_channel( diff --git a/gnes/proto/__init__.py b/gnes/proto/__init__.py index 058768fd..a13e11a9 100644 --- a/gnes/proto/__init__.py +++ b/gnes/proto/__init__.py @@ -28,33 +28,41 @@ class RequestGenerator: @staticmethod - def index(data: List[bytes], batch_size: int = 0, *args, **kwargs): + def index(data: List[bytes], batch_size: int = 0, start_id: int = 0, *args, **kwargs): + for pi in batch_iterator(data, batch_size): req = gnes_pb2.Request() + req.request_id = start_id for raw_bytes in pi: d = req.index.docs.add() d.raw_bytes = raw_bytes d.weight = 1.0 yield req + start_id += 1 @staticmethod - def train(data: List[bytes], batch_size: int = 0, *args, **kwargs): + def train(data: List[bytes], batch_size: int = 0, start_id: int = 0, *args, **kwargs): for pi in batch_iterator(data, batch_size): req = gnes_pb2.Request() + req.request_id = str(start_id) for raw_bytes in pi: d = req.train.docs.add() d.raw_bytes = raw_bytes yield req + start_id += 1 req = gnes_pb2.Request() + req.request_id = str(start_id) req.train.flush = True yield req + start_id += 1 @staticmethod - def query(query: bytes, top_k: int, *args, **kwargs): + def query(query: bytes, top_k: int, start_id: int = 0, *args, **kwargs): if top_k <= 0: raise ValueError('"top_k: %d" is not a valid number' % top_k) req = gnes_pb2.Request() + req.request_id = start_id req.search.query.raw_bytes = query req.search.top_k = top_k yield req diff --git a/gnes/proto/gnes.proto b/gnes/proto/gnes.proto index 8ca6df27..1ee72b89 100644 --- a/gnes/proto/gnes.proto +++ b/gnes/proto/gnes.proto @@ -205,11 +205,9 @@ service GnesRPC { } rpc Query (Request) returns (Response) { } - rpc _Call (Request) returns (Response) { + rpc Call (Request) returns (Response) { } - rpc TrainStream (stream Request) returns (Response) { - } - rpc IndexStream (stream Request) returns (Response) { + rpc RequestStreamCall (stream Request) returns (Response) { } } diff --git a/gnes/proto/gnes_pb2.py b/gnes/proto/gnes_pb2.py index 41ac07d0..b50bfb3d 100644 --- a/gnes/proto/gnes_pb2.py +++ b/gnes/proto/gnes_pb2.py @@ -21,7 +21,7 @@ package='gnes', syntax='proto3', serialized_options=None, - serialized_pb=_b('\n\ngnes.proto\x12\x04gnes\x1a\x1fgoogle/protobuf/timestamp.proto\"9\n\x07NdArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x11\n\x05shape\x18\x02 \x03(\rB\x02\x10\x01\x12\r\n\x05\x64type\x18\x03 \x01(\t\"\xbc\x01\n\x05\x43hunk\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x0e\n\x04text\x18\x02 \x01(\tH\x00\x12\x1d\n\x04\x62lob\x18\x03 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x11\n\toffset_1d\x18\x04 \x01(\r\x12)\n\toffset_nd\x18\x05 \x01(\x0b\x32\x16.gnes.Chunk.Coordinate\x12\x0e\n\x06weight\x18\x06 \x01(\x02\x1a\x1b\n\nCoordinate\x12\r\n\x01x\x18\x01 \x03(\rB\x02\x10\x01\x42\t\n\x07\x63ontent\"\xe2\x02\n\x08\x44ocument\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x1b\n\x06\x63hunks\x18\x02 \x03(\x0b\x32\x0b.gnes.Chunk\x12\'\n\x10\x63hunk_embeddings\x18\x03 \x01(\x0b\x32\r.gnes.NdArray\x12(\n\x08\x64oc_type\x18\x04 \x01(\x0e\x32\x16.gnes.Document.DocType\x12\x11\n\tmeta_info\x18\x05 \x01(\x0c\x12\x12\n\x08raw_text\x18\x06 \x01(\tH\x00\x12\"\n\traw_image\x18\x07 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\"\n\traw_video\x18\x08 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x13\n\traw_bytes\x18\t \x01(\x0cH\x00\x12\x0e\n\x06weight\x18\n \x01(\x02\"6\n\x07\x44ocType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04TEXT\x10\x01\x12\t\n\x05IMAGE\x10\x02\x12\t\n\x05VIDEO\x10\x03\x42\n\n\x08raw_data\"\xd4\x01\n\x08\x45nvelope\x12\x11\n\tclient_id\x18\x01 \x01(\t\x12\x12\n\nrequest_id\x18\x02 \x01(\t\x12\x0f\n\x07part_id\x18\x03 \x01(\r\x12\x10\n\x08num_part\x18\x04 \x01(\r\x12\x0f\n\x07timeout\x18\x05 \x01(\r\x12$\n\x06routes\x18\x06 \x03(\x0b\x32\x14.gnes.Envelope.route\x1aG\n\x05route\x12\x0f\n\x07service\x18\x01 \x01(\t\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"y\n\x07Message\x12 \n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x0e.gnes.Envelope\x12 \n\x07request\x18\x02 \x01(\x0b\x32\r.gnes.RequestH\x00\x12\"\n\x08response\x18\x03 \x01(\x0b\x32\x0e.gnes.ResponseH\x00\x42\x06\n\x04\x62ody\"\xf6\x03\n\x07Request\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12+\n\x05train\x18\x02 \x01(\x0b\x32\x1a.gnes.Request.TrainRequestH\x00\x12+\n\x05index\x18\x03 \x01(\x0b\x32\x1a.gnes.Request.IndexRequestH\x00\x12,\n\x06search\x18\x04 \x01(\x0b\x32\x1a.gnes.Request.QueryRequestH\x00\x12/\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1c.gnes.Request.ControlRequestH\x00\x1a;\n\x0cTrainRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x12\r\n\x05\x66lush\x18\x02 \x01(\x08\x1a,\n\x0cIndexRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x1a<\n\x0cQueryRequest\x12\x1d\n\x05query\x18\x01 \x01(\x0b\x32\x0e.gnes.Document\x12\r\n\x05top_k\x18\x02 \x01(\r\x1am\n\x0e\x43ontrolRequest\x12\x35\n\x07\x63ommand\x18\x01 \x01(\x0e\x32$.gnes.Request.ControlRequest.Command\"$\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x42\x06\n\x04\x62ody\"\xc4\x06\n\x08Response\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12-\n\x05train\x18\x02 \x01(\x0b\x32\x1c.gnes.Response.TrainResponseH\x00\x12-\n\x05index\x18\x03 \x01(\x0b\x32\x1c.gnes.Response.IndexResponseH\x00\x12.\n\x06search\x18\x04 \x01(\x0b\x32\x1c.gnes.Response.QueryResponseH\x00\x12\x31\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1e.gnes.Response.ControlResponseH\x00\x1a\x36\n\rTrainResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x36\n\rIndexResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x38\n\x0f\x43ontrolResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x81\x03\n\rQueryResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x12\r\n\x05top_k\x18\x02 \x01(\r\x12?\n\x0ctopk_results\x18\x03 \x03(\x0b\x32).gnes.Response.QueryResponse.ScoredResult\x12\x39\n\x05level\x18\x04 \x01(\x0e\x32*.gnes.Response.QueryResponse.ResponseLevel\x1a{\n\x0cScoredResult\x12\x1c\n\x05\x63hunk\x18\x01 \x01(\x0b\x32\x0b.gnes.ChunkH\x00\x12\x1d\n\x03\x64oc\x18\x02 \x01(\x0b\x32\x0e.gnes.DocumentH\x00\x12\r\n\x05score\x18\x03 \x01(\x02\x12\x17\n\x0fscore_explained\x18\x04 \x01(\tB\x06\n\x04\x62ody\"A\n\rResponseLevel\x12\t\n\x05\x43HUNK\x10\x00\x12\x17\n\x13\x44OCUMENT_NOT_FILLED\x10\x01\x12\x0c\n\x08\x44OCUMENT\x10\x02\"-\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07PENDING\x10\x02\x42\x06\n\x04\x62ody2\x95\x02\n\x07GnesRPC\x12(\n\x05Train\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Index\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Query\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05_Call\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\x30\n\x0bTrainStream\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00(\x01\x12\x30\n\x0bIndexStream\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00(\x01\x62\x06proto3') + serialized_pb=_b('\n\ngnes.proto\x12\x04gnes\x1a\x1fgoogle/protobuf/timestamp.proto\"9\n\x07NdArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x11\n\x05shape\x18\x02 \x03(\rB\x02\x10\x01\x12\r\n\x05\x64type\x18\x03 \x01(\t\"\xbc\x01\n\x05\x43hunk\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x0e\n\x04text\x18\x02 \x01(\tH\x00\x12\x1d\n\x04\x62lob\x18\x03 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x11\n\toffset_1d\x18\x04 \x01(\r\x12)\n\toffset_nd\x18\x05 \x01(\x0b\x32\x16.gnes.Chunk.Coordinate\x12\x0e\n\x06weight\x18\x06 \x01(\x02\x1a\x1b\n\nCoordinate\x12\r\n\x01x\x18\x01 \x03(\rB\x02\x10\x01\x42\t\n\x07\x63ontent\"\xe2\x02\n\x08\x44ocument\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x1b\n\x06\x63hunks\x18\x02 \x03(\x0b\x32\x0b.gnes.Chunk\x12\'\n\x10\x63hunk_embeddings\x18\x03 \x01(\x0b\x32\r.gnes.NdArray\x12(\n\x08\x64oc_type\x18\x04 \x01(\x0e\x32\x16.gnes.Document.DocType\x12\x11\n\tmeta_info\x18\x05 \x01(\x0c\x12\x12\n\x08raw_text\x18\x06 \x01(\tH\x00\x12\"\n\traw_image\x18\x07 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\"\n\traw_video\x18\x08 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x13\n\traw_bytes\x18\t \x01(\x0cH\x00\x12\x0e\n\x06weight\x18\n \x01(\x02\"6\n\x07\x44ocType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04TEXT\x10\x01\x12\t\n\x05IMAGE\x10\x02\x12\t\n\x05VIDEO\x10\x03\x42\n\n\x08raw_data\"\xd4\x01\n\x08\x45nvelope\x12\x11\n\tclient_id\x18\x01 \x01(\t\x12\x12\n\nrequest_id\x18\x02 \x01(\t\x12\x0f\n\x07part_id\x18\x03 \x01(\r\x12\x10\n\x08num_part\x18\x04 \x01(\r\x12\x0f\n\x07timeout\x18\x05 \x01(\r\x12$\n\x06routes\x18\x06 \x03(\x0b\x32\x14.gnes.Envelope.route\x1aG\n\x05route\x12\x0f\n\x07service\x18\x01 \x01(\t\x12-\n\ttimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"y\n\x07Message\x12 \n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x0e.gnes.Envelope\x12 \n\x07request\x18\x02 \x01(\x0b\x32\r.gnes.RequestH\x00\x12\"\n\x08response\x18\x03 \x01(\x0b\x32\x0e.gnes.ResponseH\x00\x42\x06\n\x04\x62ody\"\xf6\x03\n\x07Request\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12+\n\x05train\x18\x02 \x01(\x0b\x32\x1a.gnes.Request.TrainRequestH\x00\x12+\n\x05index\x18\x03 \x01(\x0b\x32\x1a.gnes.Request.IndexRequestH\x00\x12,\n\x06search\x18\x04 \x01(\x0b\x32\x1a.gnes.Request.QueryRequestH\x00\x12/\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1c.gnes.Request.ControlRequestH\x00\x1a;\n\x0cTrainRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x12\r\n\x05\x66lush\x18\x02 \x01(\x08\x1a,\n\x0cIndexRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x1a<\n\x0cQueryRequest\x12\x1d\n\x05query\x18\x01 \x01(\x0b\x32\x0e.gnes.Document\x12\r\n\x05top_k\x18\x02 \x01(\r\x1am\n\x0e\x43ontrolRequest\x12\x35\n\x07\x63ommand\x18\x01 \x01(\x0e\x32$.gnes.Request.ControlRequest.Command\"$\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x42\x06\n\x04\x62ody\"\xc4\x06\n\x08Response\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12-\n\x05train\x18\x02 \x01(\x0b\x32\x1c.gnes.Response.TrainResponseH\x00\x12-\n\x05index\x18\x03 \x01(\x0b\x32\x1c.gnes.Response.IndexResponseH\x00\x12.\n\x06search\x18\x04 \x01(\x0b\x32\x1c.gnes.Response.QueryResponseH\x00\x12\x31\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1e.gnes.Response.ControlResponseH\x00\x1a\x36\n\rTrainResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x36\n\rIndexResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x38\n\x0f\x43ontrolResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x81\x03\n\rQueryResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x12\r\n\x05top_k\x18\x02 \x01(\r\x12?\n\x0ctopk_results\x18\x03 \x03(\x0b\x32).gnes.Response.QueryResponse.ScoredResult\x12\x39\n\x05level\x18\x04 \x01(\x0e\x32*.gnes.Response.QueryResponse.ResponseLevel\x1a{\n\x0cScoredResult\x12\x1c\n\x05\x63hunk\x18\x01 \x01(\x0b\x32\x0b.gnes.ChunkH\x00\x12\x1d\n\x03\x64oc\x18\x02 \x01(\x0b\x32\x0e.gnes.DocumentH\x00\x12\r\n\x05score\x18\x03 \x01(\x02\x12\x17\n\x0fscore_explained\x18\x04 \x01(\tB\x06\n\x04\x62ody\"A\n\rResponseLevel\x12\t\n\x05\x43HUNK\x10\x00\x12\x17\n\x13\x44OCUMENT_NOT_FILLED\x10\x01\x12\x0c\n\x08\x44OCUMENT\x10\x02\"-\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07PENDING\x10\x02\x42\x06\n\x04\x62ody2\xe8\x01\n\x07GnesRPC\x12(\n\x05Train\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Index\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Query\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\'\n\x04\x43\x61ll\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\x36\n\x11RequestStreamCall\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00(\x01\x62\x06proto3') , dependencies=[google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,]) @@ -1238,7 +1238,7 @@ index=0, serialized_options=None, serialized_start=2343, - serialized_end=2620, + serialized_end=2575, methods=[ _descriptor.MethodDescriptor( name='Train', @@ -1268,8 +1268,8 @@ serialized_options=None, ), _descriptor.MethodDescriptor( - name='_Call', - full_name='gnes.GnesRPC._Call', + name='Call', + full_name='gnes.GnesRPC.Call', index=3, containing_service=None, input_type=_REQUEST, @@ -1277,23 +1277,14 @@ serialized_options=None, ), _descriptor.MethodDescriptor( - name='TrainStream', - full_name='gnes.GnesRPC.TrainStream', + name='RequestStreamCall', + full_name='gnes.GnesRPC.RequestStreamCall', index=4, containing_service=None, input_type=_REQUEST, output_type=_RESPONSE, serialized_options=None, ), - _descriptor.MethodDescriptor( - name='IndexStream', - full_name='gnes.GnesRPC.IndexStream', - index=5, - containing_service=None, - input_type=_REQUEST, - output_type=_RESPONSE, - serialized_options=None, - ), ]) _sym_db.RegisterServiceDescriptor(_GNESRPC) diff --git a/gnes/proto/gnes_pb2_grpc.py b/gnes/proto/gnes_pb2_grpc.py index 905a8260..5d6b519e 100644 --- a/gnes/proto/gnes_pb2_grpc.py +++ b/gnes/proto/gnes_pb2_grpc.py @@ -29,18 +29,13 @@ def __init__(self, channel): request_serializer=gnes__pb2.Request.SerializeToString, response_deserializer=gnes__pb2.Response.FromString, ) - self._Call = channel.unary_unary( - '/gnes.GnesRPC/_Call', + self.Call = channel.unary_unary( + '/gnes.GnesRPC/Call', request_serializer=gnes__pb2.Request.SerializeToString, response_deserializer=gnes__pb2.Response.FromString, ) - self.TrainStream = channel.stream_unary( - '/gnes.GnesRPC/TrainStream', - request_serializer=gnes__pb2.Request.SerializeToString, - response_deserializer=gnes__pb2.Response.FromString, - ) - self.IndexStream = channel.stream_unary( - '/gnes.GnesRPC/IndexStream', + self.RequestStreamCall = channel.stream_unary( + '/gnes.GnesRPC/RequestStreamCall', request_serializer=gnes__pb2.Request.SerializeToString, response_deserializer=gnes__pb2.Response.FromString, ) @@ -72,21 +67,14 @@ def Query(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def _Call(self, request, context): - # missing associated documentation comment in .proto file - pass - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def TrainStream(self, request_iterator, context): + def Call(self, request, context): # missing associated documentation comment in .proto file pass context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def IndexStream(self, request_iterator, context): + def RequestStreamCall(self, request_iterator, context): # missing associated documentation comment in .proto file pass context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -111,18 +99,13 @@ def add_GnesRPCServicer_to_server(servicer, server): request_deserializer=gnes__pb2.Request.FromString, response_serializer=gnes__pb2.Response.SerializeToString, ), - '_Call': grpc.unary_unary_rpc_method_handler( - servicer._Call, - request_deserializer=gnes__pb2.Request.FromString, - response_serializer=gnes__pb2.Response.SerializeToString, - ), - 'TrainStream': grpc.stream_unary_rpc_method_handler( - servicer.TrainStream, + 'Call': grpc.unary_unary_rpc_method_handler( + servicer.Call, request_deserializer=gnes__pb2.Request.FromString, response_serializer=gnes__pb2.Response.SerializeToString, ), - 'IndexStream': grpc.stream_unary_rpc_method_handler( - servicer.IndexStream, + 'RequestStreamCall': grpc.stream_unary_rpc_method_handler( + servicer.RequestStreamCall, request_deserializer=gnes__pb2.Request.FromString, response_serializer=gnes__pb2.Response.SerializeToString, ), diff --git a/gnes/service/grpc.py b/gnes/service/grpc.py index 1f167317..d5df3001 100644 --- a/gnes/service/grpc.py +++ b/gnes/service/grpc.py @@ -29,7 +29,7 @@ __all__ = ['GRPCFrontend'] -class ZmqContext(object): +class ZmqContext: """The zmq context class.""" def __init__(self, args): @@ -111,32 +111,29 @@ def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): msg.request.CopyFrom(body) return msg - def _Call(self, request, context): + def remove_envelope(self, m: 'gnes_pb2.Message'): + resp = m.response + resp.request_id = m.envelope.request_id + return resp + + def Call(self, request, context): self.logger.info('received a new request: %s' % request.request_id or 'EMPTY_REQUEST_ID') with self.zmq_context as zmq_client: - msg = self.add_envelope(request, zmq_client) - zmq_client.send_message(msg, self.args.timeout) - resp = zmq_client.recv_message(self.args.timeout) - self.logger.info("received message done!") - return resp.response + zmq_client.send_message(self.add_envelope(request, zmq_client), self.args.timeout) + return self.remove_envelope(zmq_client.recv_message(self.args.timeout)) def Train(self, request, context): - return self._Call(request, context) + return self.Call(request, context) def Index(self, request, context): - return self._Call(request, context) + return self.Call(request, context) def Search(self, request, context): - return self._Call(request, context) - - def TrainStream(self, request_iterator, context): - for request in request_iterator: - ret = self._Call(request, context) - return ret + return self.Call(request, context) - def IndexStream(self, request_iterator, context): + def RequestStreamCall(self, request_iterator, context): for request in request_iterator: - ret = self._Call(request, context) + ret = self.Call(request, context) return ret diff --git a/shell/make-proto.sh b/shell/make-proto.sh index e1cd71a0..78f6c4e3 100755 --- a/shell/make-proto.sh +++ b/shell/make-proto.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash SRC_DIR=../gnes/proto/ -#PLUGIN_PATH=/Volumes/TOSHIBA-4T/Documents/grpc/bins/opt/grpc_python_plugin -PLUGIN_PATH=/user/local/grpc/bins/opt/grpc_python_plugin +PLUGIN_PATH=/Volumes/TOSHIBA-4T/Documents/grpc/bins/opt/grpc_python_plugin +#PLUGIN_PATH=/user/local/grpc/bins/opt/grpc_python_plugin protoc -I ${SRC_DIR} --python_out=${SRC_DIR} --grpc_python_out=${SRC_DIR} --plugin=protoc-gen-grpc_python=${PLUGIN_PATH} ${SRC_DIR}gnes.proto diff --git a/tests/test_stream_grpc.py b/tests/test_stream_grpc.py new file mode 100644 index 00000000..576c14eb --- /dev/null +++ b/tests/test_stream_grpc.py @@ -0,0 +1,45 @@ +import os +import unittest.mock + +import grpc + +from gnes.cli.parser import set_grpc_frontend_parser, set_router_service_parser +from gnes.helper import TimeContext +from gnes.proto import RequestGenerator, gnes_pb2_grpc +from gnes.service.base import SocketType +from gnes.service.grpc import GRPCFrontend +from gnes.service.router import RouterService + + +class TestStreamgRPC(unittest.TestCase): + + def setUp(self): + self.all_bytes = [b'abc', b'def', b'cde'] * 10 + + @unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}) + def test_grpc_frontend(self): + args = set_grpc_frontend_parser().parse_args([ + '--grpc_host', '127.0.0.1', + ]) + + p_args = set_router_service_parser().parse_args([ + '--port_in', str(args.port_out), + '--port_out', str(args.port_in), + '--socket_in', str(SocketType.PULL_CONNECT), + '--socket_out', str(SocketType.PUSH_CONNECT), + ]) + + with RouterService(p_args), GRPCFrontend(args), grpc.insecure_channel( + '%s:%s' % (args.grpc_host, args.grpc_port), + options=[('grpc.max_send_message_length', 70 * 1024 * 1024), + ('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel: + stub = gnes_pb2_grpc.GnesRPCStub(channel) + with TimeContext('sync call'): # about 5s + resp = stub.RequestStreamCall(RequestGenerator.train(self.all_bytes, 1)) + + self.assertEqual(resp.request_id, str(len(self.all_bytes))) # idx start with 0, but +1 for final FLUSH + + # test async calls + with TimeContext('async call'): # immeidiately returns 0.001 s + resp = stub.RequestStreamCall.future(RequestGenerator.train(self.all_bytes, 1)) + self.assertEqual(resp.result().request_id, str(len(self.all_bytes)))