From 9973f60065d8127bdc236e547faa2f44c4eb9afd Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Thu, 25 Jul 2019 17:33:01 +0800 Subject: [PATCH] refactor(preprocessor): rename singleton to unary --- .../component/img_preprocessor_singleton.yml | 2 +- gnes/composer/base.py | 2 +- gnes/preprocessor/__init__.py | 2 +- gnes/preprocessor/base.py | 2 +- tests/test_image_encoder.py | 4 +- tests/test_image_preprocessor.py | 16 +++--- tests/test_stream_grpc.py | 56 ++++++++++++++++++- tests/yaml/base-singleton-image-prep.yml | 2 +- tests/yaml/base-singleton-text-prep.yml | 2 +- 9 files changed, 71 insertions(+), 17 deletions(-) diff --git a/docker-compose/component/img_preprocessor_singleton.yml b/docker-compose/component/img_preprocessor_singleton.yml index e5f03a49..6964b86c 100644 --- a/docker-compose/component/img_preprocessor_singleton.yml +++ b/docker-compose/component/img_preprocessor_singleton.yml @@ -1,3 +1,3 @@ -!BaseSingletonPreprocessor +!BaseUnaryPreprocessor parameter: doc_type: 2 \ No newline at end of file diff --git a/gnes/composer/base.py b/gnes/composer/base.py index 89797008..41e77f23 100644 --- a/gnes/composer/base.py +++ b/gnes/composer/base.py @@ -292,7 +292,7 @@ def build_mermaid(all_layers: List['YamlComposer.Layer'], mermaid_leftright: boo # if len(last_layer.components) > 1: # self.mermaid_graph.append('\tend') - style = ['classDef FrontendCLS fill:#ffb347,stroke:#277CE8,stroke-width:1px,stroke-dasharray:5;', + style = ['classDef gRPCFrontendCLS fill:#FFAA04,stroke:#277CE8,stroke-width:1px;', 'classDef EncoderCLS fill:#27E1E8,stroke:#277CE8,stroke-width:1px;', 'classDef IndexerCLS fill:#27E1E8,stroke:#277CE8,stroke-width:1px;', 'classDef RouterCLS fill:#2BFFCB,stroke:#277CE8,stroke-width:1px;', diff --git a/gnes/preprocessor/__init__.py b/gnes/preprocessor/__init__.py index b28f56e4..112af956 100644 --- a/gnes/preprocessor/__init__.py +++ b/gnes/preprocessor/__init__.py @@ -26,7 +26,7 @@ 'VanillaSlidingPreprocessor': 'image.sliding_window', 'WeightedSlidingPreprocessor': 'image.sliding_window', 'SegmentPreprocessor': 'image.segmentation', - 'BaseSingletonPreprocessor': 'base', + 'BaseUnaryPreprocessor': 'base', 'BaseVideoPreprocessor': 'video.base', 'FFmpegPreprocessor': 'video.ffmpeg', 'ShotDetectPreprocessor': 'video.shotdetect', diff --git a/gnes/preprocessor/base.py b/gnes/preprocessor/base.py index 64cc3632..f2d0e61d 100644 --- a/gnes/preprocessor/base.py +++ b/gnes/preprocessor/base.py @@ -38,7 +38,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None: doc.doc_type = self.doc_type -class BaseSingletonPreprocessor(BasePreprocessor): +class BaseUnaryPreprocessor(BasePreprocessor): def __init__(self, doc_type: int, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/tests/test_image_encoder.py b/tests/test_image_encoder.py index 842e3e01..825ef23c 100644 --- a/tests/test_image_encoder.py +++ b/tests/test_image_encoder.py @@ -6,7 +6,7 @@ from gnes.encoder.image.base import BasePytorchEncoder from gnes.preprocessor.image.sliding_window import VanillaSlidingPreprocessor -from gnes.preprocessor.base import BaseSingletonPreprocessor +from gnes.preprocessor.base import BaseUnaryPreprocessor from gnes.proto import gnes_pb2, blob2array @@ -20,7 +20,7 @@ def img_process_for_test(dirname): test_img.append(d) test_img_all_preprocessor = [] - for preprocessor in [BaseSingletonPreprocessor(doc_type=gnes_pb2.Document.IMAGE), + for preprocessor in [BaseUnaryPreprocessor(doc_type=gnes_pb2.Document.IMAGE), VanillaSlidingPreprocessor()]: test_img_copy = copy.deepcopy(test_img) for img in test_img_copy: diff --git a/tests/test_image_preprocessor.py b/tests/test_image_preprocessor.py index 7ee724dc..c224a462 100644 --- a/tests/test_image_preprocessor.py +++ b/tests/test_image_preprocessor.py @@ -12,13 +12,13 @@ class TestProto(unittest.TestCase): def setUp(self): self.dirname = os.path.dirname(__file__) - self.singleton_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-singleton-image-prep.yml') + self.unary_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-unary-image-prep.yml') self.slidingwindow_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-vanilla_sldwin-image-prep.yml') self.segmentation_img_pre_yaml = os.path.join(self.dirname, 'yaml', 'base-segmentation-image-prep.yml') - def test_singleton_preprocessor_service_empty(self): + def test_unary_preprocessor_service_empty(self): args = set_preprocessor_service_parser().parse_args([ - '--yaml_path', self.singleton_img_pre_yaml + '--yaml_path', self.unary_img_pre_yaml ]) with PreprocessorService(args): pass @@ -37,9 +37,9 @@ def test_segmentation_preprocessor_service_empty(self): with PreprocessorService(args): pass - def test_singleton_preprocessor_service_echo(self): + def test_unary_preprocessor_service_echo(self): args = set_preprocessor_service_parser().parse_args([ - '--yaml_path', self.singleton_img_pre_yaml + '--yaml_path', self.unary_img_pre_yaml ]) c_args = _set_client_parser().parse_args([ '--port_in', str(args.port_out), @@ -94,9 +94,9 @@ def test_segmentation_preprocessor_service_echo(self): r = client.recv_message() # print(r) - def test_singleton_preprocessor_service_realdata(self): + def test_unary_preprocessor_service_realdata(self): args = set_preprocessor_service_parser().parse_args([ - '--yaml_path', self.singleton_img_pre_yaml + '--yaml_path', self.unary_img_pre_yaml ]) c_args = _set_client_parser().parse_args([ '--port_in', str(args.port_out), @@ -111,7 +111,7 @@ def test_singleton_preprocessor_service_realdata(self): msg.request.index.CopyFrom(req.index) client.send_message(msg) r = client.recv_message() - self.assertEqual(r.envelope.routes[0].service, 'PreprocessorService:BaseSingletonPreprocessor') + self.assertEqual(r.envelope.routes[0].service, 'PreprocessorService:BaseUnaryPreprocessor') for d in r.request.index.docs: self.assertEqual(len(d.chunks), 1) self.assertEqual(len(blob2array(d.chunks[0].blob).shape), 3) diff --git a/tests/test_stream_grpc.py b/tests/test_stream_grpc.py index 576c14eb..949df74a 100644 --- a/tests/test_stream_grpc.py +++ b/tests/test_stream_grpc.py @@ -1,4 +1,5 @@ import os +import time import unittest.mock import grpc @@ -6,15 +7,36 @@ from gnes.cli.parser import set_grpc_frontend_parser, set_router_service_parser from gnes.helper import TimeContext from gnes.proto import RequestGenerator, gnes_pb2_grpc -from gnes.service.base import SocketType +from gnes.service.base import SocketType, MessageHandler, BaseService as BS from gnes.service.grpc import GRPCFrontend from gnes.service.router import RouterService +class Router1(RouterService): + handler = MessageHandler(BS.handler) + + @handler.register(NotImplementedError) + def _handler_default(self, msg: 'gnes_pb2.Message'): + self.logger.info('im doing fancy jobs...') + time.sleep(2) + super()._handler_default(msg) + + +class Router2(RouterService): + handler = MessageHandler(BS.handler) + + @handler.register(NotImplementedError) + def _handler_default(self, msg: 'gnes_pb2.Message'): + self.logger.info('im doing stupid jobs...') + time.sleep(6) + super()._handler_default(msg) + + class TestStreamgRPC(unittest.TestCase): def setUp(self): self.all_bytes = [b'abc', b'def', b'cde'] * 10 + self.all_bytes2 = [b'abc', b'def', b'cde'] @unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}) def test_grpc_frontend(self): @@ -43,3 +65,35 @@ def test_grpc_frontend(self): with TimeContext('async call'): # immeidiately returns 0.001 s resp = stub.RequestStreamCall.future(RequestGenerator.train(self.all_bytes, 1)) self.assertEqual(resp.result().request_id, str(len(self.all_bytes))) + + @unittest.mock.patch.dict(os.environ, {'http_proxy': '', 'https_proxy': ''}) + def test_async_block(self): + args = set_grpc_frontend_parser().parse_args([ + '--grpc_host', '127.0.0.1', + ]) + + p1_args = set_router_service_parser().parse_args([ + '--port_in', str(args.port_out), + '--port_out', '8899', + '--socket_in', str(SocketType.PULL_CONNECT), + '--socket_out', str(SocketType.PUSH_CONNECT), + ]) + + p2_args = set_router_service_parser().parse_args([ + '--port_in', str(p1_args.port_out), + '--port_out', str(args.port_in), + '--socket_in', str(SocketType.PULL_BIND), + '--socket_out', str(SocketType.PUSH_CONNECT), + ]) + + with Router1(p1_args), Router2(p2_args), GRPCFrontend(args), grpc.insecure_channel( + '%s:%s' % (args.grpc_host, args.grpc_port), + options=[('grpc.max_send_message_length', 70 * 1024 * 1024), + ('grpc.max_receive_message_length', 70 * 1024 * 1024)]) as channel: + stub = gnes_pb2_grpc.GnesRPCStub(channel) + with TimeContext('sync call'): # about 5s + resp = stub.RequestStreamCall.future(RequestGenerator.train(self.all_bytes, 1)) + + self.assertEqual(resp.result().request_id, str(len(self.all_bytes))) + + self.assertEqual(resp.request_id, str(len(self.all_bytes2))) # idx start with 0, but +1 for final FLUSH diff --git a/tests/yaml/base-singleton-image-prep.yml b/tests/yaml/base-singleton-image-prep.yml index e5f03a49..6964b86c 100644 --- a/tests/yaml/base-singleton-image-prep.yml +++ b/tests/yaml/base-singleton-image-prep.yml @@ -1,3 +1,3 @@ -!BaseSingletonPreprocessor +!BaseUnaryPreprocessor parameter: doc_type: 2 \ No newline at end of file diff --git a/tests/yaml/base-singleton-text-prep.yml b/tests/yaml/base-singleton-text-prep.yml index 47a021e3..3e7613b8 100644 --- a/tests/yaml/base-singleton-text-prep.yml +++ b/tests/yaml/base-singleton-text-prep.yml @@ -1,3 +1,3 @@ -!BaseSingletonPreprocessor +!BaseUnaryPreprocessor parameter: doc_type: 1 \ No newline at end of file