From 8fbb094526eddc0ed1233061b2ca018832ecd060 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Mon, 23 Sep 2019 14:54:53 +0800 Subject: [PATCH] feat(router): add a block router for benchmarking --- gnes/cli/parser.py | 1 - gnes/router/map.py | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index 2474e5ff..8979297b 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -140,7 +140,6 @@ def set_composer_flask_parser(parser=None): def set_service_parser(parser=None): from ..service.base import SocketType, BaseService, ParallelType import random - import uuid if not parser: parser = set_base_parser() min_port, max_port = 49152, 65536 diff --git a/gnes/router/map.py b/gnes/router/map.py index af2c7b16..b32009f9 100644 --- a/gnes/router/map.py +++ b/gnes/router/map.py @@ -20,7 +20,22 @@ from ..proto import gnes_pb2 +class BlockRouter(BaseMapRouter): + """Wait for 'sleep_sec' seconds and forward messages, useful for benchmark""" + + def __init__(self, sleep_sec: int = 5, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sleep_sec = sleep_sec + + def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs): + import time + time.sleep(self.sleep_sec) + + class PublishRouter(BaseMapRouter): + """Copy a message 'num_part' time and forward it, useful for PUB-SUB sockets. + 'num_part' is an indicator for downstream sync-barrier, e.g. a ReduceRouter + """ def __init__(self, num_part: int, *args, **kwargs): super().__init__(*args, **kwargs)