diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index da089722..342206ed 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -178,7 +178,8 @@ def set_service_parser(parser=None): parser.add_argument('--timeout', type=int, default=-1, help='timeout (ms) of all communication, -1 for waiting forever') parser.add_argument('--dump_interval', type=int, default=5, - help='serialize the service to a file every n seconds, -1 means --read_only') + help='serialize the model in the service every n seconds if model changes. ' + '-1 means --read_only. ') parser.add_argument('--read_only', action='store_true', default=False, help='do not allow the service to modify the model, ' 'dump_interval will be ignored') diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index b8ed4622..c158ea4e 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -62,13 +62,15 @@ class Flow: You can use `.add()` then `.build()` to customize your own workflow. For example: + .. highlight:: python .. code-block:: python f = (Flow(check_version=False, route_table=True) - .add(gfs.Router, yaml_path='BaseRouter') - .add(gfs.Router, yaml_path='BaseRouter') + .add(gfs.Preprocessor, yaml_path='BasePreprocessor') + .add(gfs.Encoder, yaml_path='BaseEncoder') .add(gfs.Router, yaml_path='BaseRouter')) + with f.build(backend='thread') as flow: flow.index() ... @@ -77,6 +79,7 @@ class Flow: Note the different default copy behaviors in `.add()` and `.build()`: `.add()` always copy the flow by default, whereas `.build()` modify the flow in place. You can change this behavior by giving an argument `copy_flow=False`. + """ _supported_orch = {'swarm', 'k8s'} _service2parser = { @@ -211,14 +214,14 @@ def add(self, service: 'Service', **kwargs) -> 'Flow': """ Add a service to the current flow object and return the new modified flow object - :param copy_flow: when set to true, then always copy the current flow - and do the modification on top of it then return, otherwise, do in-line modification + :param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend :param name: the name indentifier of the service, useful in 'service_in' and 'service_out' :param service_in: the name of the service(s) that this service receives data from. One can also use 'Service.Frontend' to indicate the connection with the frontend. :param service_out: the name of the service(s) that this service sends data to. One can also use 'Service.Frontend' to indicate the connection with the frontend. + :param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification :param kwargs: other keyword-value arguments that the service CLI supports :return: a (new) flow object with modification """ diff --git a/gnes/service/base.py b/gnes/service/base.py index cb5361a7..5db79e9d 100644 --- a/gnes/service/base.py +++ b/gnes/service/base.py @@ -320,6 +320,7 @@ def __init__(self, args): self.is_event_loop = self._get_event() self.is_model_changed = self._get_event() self.is_handler_done = self._get_event() + self.last_dump_time = time.perf_counter() self._model = None self.use_event_loop = True self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl) @@ -335,29 +336,20 @@ def run(self): except Exception as ex: self.logger.error(ex, exc_info=True) - def _start_auto_dump(self): - if self.args.dump_interval > 0 and not self.args.read_only: - self._auto_dump_thread = threading.Thread(target=self._auto_dump) - self._auto_dump_thread.setDaemon(1) - self._auto_dump_thread.start() - - def _auto_dump(self): - while self.is_event_loop.is_set(): - if self.is_model_changed.is_set(): - self.is_model_changed.clear() - self.logger.info( - 'auto-dumping the new change of the model every %ds...' % self.args.dump_interval) - self.dump() - time.sleep(self.args.dump_interval) - - def dump(self): - if not self.args.read_only: - if self._model: - self.logger.info('dumping changes to the model...') - self._model.dump() - self.logger.info('dumping finished!') - else: - self.logger.info('no dumping as "read_only" set to true.') + def dump(self, respect_dump_interval: bool = True): + if (not self.args.read_only + and self.args.dump_interval > 0 + and self._model + and self.is_model_changed.is_set() + and (respect_dump_interval + and (time.perf_counter() - self.last_dump_time) > self.args.dump_interval) + or not respect_dump_interval): + self.is_model_changed.clear() + self.logger.info('dumping changes to the model, %3.0fs since last the dump' + % (time.perf_counter() - self.last_dump_time)) + self._model.dump() + self.last_dump_time = time.perf_counter() + self.logger.info('dumping finished! next dump will start in at least %3.0fs' % self.args.dump_interval) @handler.register_hook(hook_type='post') def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs): @@ -414,17 +406,17 @@ def _run(self, ctx): self.post_init() self.is_ready.set() self.is_event_loop.set() - self._start_auto_dump() self.logger.critical('ready and listening') while self.is_event_loop.is_set(): - pull_sock = None - socks = dict(poller.poll()) + socks = dict(poller.poll(1)) if socks.get(in_sock) == zmq.POLLIN: pull_sock = in_sock elif socks.get(ctrl_sock) == zmq.POLLIN: pull_sock = ctrl_sock else: - self.logger.error('received message from unknown socket: %s' % socks) + # no message received, pass + continue + if self.use_event_loop or pull_sock == ctrl_sock: with TimeContext('handling message', self.logger): self.is_handler_done.clear() @@ -450,10 +442,13 @@ def _run(self, ctx): self.logger.warning( 'received a new message but since "use_event_loop=False" I will not handle it. ' 'I will just block the thread until "is_handler_done" is set!') + # wait until some one else call is_handler_done.set() self.is_handler_done.wait() + # clear the handler status self.is_handler_done.clear() - if self.args.dump_interval == 0: - self.dump() + + # block the event loop if a dump is needed + self.dump() except EventLoopEnd: self.logger.info('break from the event loop') except ComponentNotLoad: @@ -466,6 +461,8 @@ def _run(self, ctx): in_sock.close() out_sock.close() ctrl_sock.close() + # do not check dump_interval constraint as the last dump before close + self.dump(respect_dump_interval=False) self.logger.critical('terminated') def post_init(self): diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 6d62bae0..a60b497b 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -131,11 +131,6 @@ def get_response(num_recv, blocked=False): zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) self.pending_request += 1 - num_recv = max(self.pending_request - self.args.max_pending_request, 1) - - # switch to blocked recv when too many pending requests - yield from get_response(num_recv, num_recv > 1) - yield from get_response(self.pending_request, blocked=True) class ZmqContext: diff --git a/tests/test_service_mgr.py b/tests/test_service_mgr.py index 224233ae..c7af3726 100644 --- a/tests/test_service_mgr.py +++ b/tests/test_service_mgr.py @@ -97,8 +97,6 @@ def test_external_module(self): with ServiceManager(RouterService, args): pass - self.assertTrue(os.path.exists('foo_contrib_encoder.bin')) - os.remove('foo_contrib_encoder.bin') def test_override_module(self): args = set_indexer_parser().parse_args([ @@ -108,8 +106,6 @@ def test_override_module(self): with ServiceManager(IndexerService, args): pass - self.assertTrue(os.path.exists('foo_contrib_encoder.bin')) - os.remove('foo_contrib_encoder.bin') def test_override_twice_module(self): args = set_indexer_parser().parse_args([ @@ -120,8 +116,6 @@ def test_override_twice_module(self): with ServiceManager(IndexerService, args): pass - self.assertTrue(os.path.exists('foo_contrib_encoder.bin')) - os.remove('foo_contrib_encoder.bin') def test_grpc_with_pub(self): self._test_grpc_multiple_pub('thread', 1)