From e750cf9f7459d441d2ecce6d2aa69cd2d0e6c71d Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Sun, 29 Sep 2019 18:17:26 +0800 Subject: [PATCH 1/2] feat(frontend): add max pending request to frontend --- gnes/cli/parser.py | 2 ++ gnes/service/frontend.py | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index 764c4ee6..11604305 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -330,6 +330,8 @@ def set_frontend_parser(parser=None): help='maximum concurrent connections allowed') parser.add_argument('--dump_route', type=argparse.FileType('w', encoding='utf8'), help='dumping route information to a file') + parser.add_argument('--max_pending_request', type=int, default=100, + help='maximum number of pending requests allowed, when exceed wait until we receive the response') return parser diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 646e9813..142a7a1b 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -20,7 +20,6 @@ import grpc from google.protobuf.json_format import MessageToJson -from .. import __version__, __proto_version__ from ..client.base import ZmqClient from ..helper import set_logger, make_route_table from ..proto import gnes_pb2_grpc, gnes_pb2, router2str, add_route, add_version @@ -105,18 +104,21 @@ def Search(self, request, context): def StreamCall(self, request_iterator, context): with self.zmq_context as zmq_client: - num_request = 0 + pending_request = 0 for request in request_iterator: zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - num_request += 1 + pending_request += 1 - if zmq_client.receiver.poll(1): - msg = zmq_client.recv_message(**self.send_recv_kwargs) - num_request -= 1 - yield self.remove_envelope(msg) + while pending_request > self.args.max_pending_request: + # too many pending requests, the whole network is pretty busy + # slow down the sending rate by waiting responses + if zmq_client.receiver.poll(1): + msg = zmq_client.recv_message(**self.send_recv_kwargs) + pending_request -= 1 + yield self.remove_envelope(msg) - for _ in range(num_request): + for _ in range(pending_request): msg = zmq_client.recv_message(**self.send_recv_kwargs) yield self.remove_envelope(msg) From bbf1ed8e392be2a68f697ce8b8d558acf71d68ad Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Sun, 29 Sep 2019 18:59:02 +0800 Subject: [PATCH 2/2] feat(frontend): add max pending request to frontend --- gnes/service/frontend.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 142a7a1b..215f85ef 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -58,6 +58,7 @@ def __init__(self, args): check_version=self.args.check_version, timeout=self.args.timeout, squeeze_pb=self.args.squeeze_pb) + self.pending_request = 0 def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): msg = gnes_pb2.Message() @@ -103,24 +104,27 @@ def Search(self, request, context): return self.Call(request, context) def StreamCall(self, request_iterator, context): + self.pending_request = 0 + + def get_response(num_recv, blocked=False): + for _ in range(num_recv): + if blocked or zmq_client.receiver.poll(1): + msg = zmq_client.recv_message(**self.send_recv_kwargs) + self.pending_request -= 1 + yield self.remove_envelope(msg) + with self.zmq_context as zmq_client: - pending_request = 0 for request in request_iterator: zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - pending_request += 1 - - while pending_request > self.args.max_pending_request: - # too many pending requests, the whole network is pretty busy - # slow down the sending rate by waiting responses - if zmq_client.receiver.poll(1): - msg = zmq_client.recv_message(**self.send_recv_kwargs) - pending_request -= 1 - yield self.remove_envelope(msg) - - for _ in range(pending_request): - msg = zmq_client.recv_message(**self.send_recv_kwargs) - yield self.remove_envelope(msg) + self.pending_request += 1 + + num_recv = max(self.pending_request - self.args.max_pending_request, 1) + + # switch to blocked recv when too many pending requests + yield from get_response(num_recv, num_recv > 1) + + yield from get_response(self.pending_request, blocked=True) class ZmqContext: """The zmq context class."""