diff --git a/gnes/service/base.py b/gnes/service/base.py index 144f32fc..81fc3e2a 100644 --- a/gnes/service/base.py +++ b/gnes/service/base.py @@ -173,8 +173,7 @@ def decorator(f): return decorator - def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]], verbose: bool, - *args, **kwargs): + def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]], *args, **kwargs): """ All post handler hooks are called after the handler is done but before sending out the message to the next service. @@ -194,10 +193,10 @@ def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]], raise TypeError('hook_type is in bad type: %s' % type(hook_type)) for fn, only_verbose in hooks: - if (only_verbose and verbose) or (not only_verbose): + if (only_verbose and self.service_context.args.verbose) or (not only_verbose): fn(self.service_context, msg, *args, **kwargs) - def get_serve_fn(self, msg: 'gnes_pb2.Message'): + def call_routes(self, msg: 'gnes_pb2.Message'): def get_default_fn(m_type): self.logger.warning('cant find handler for message type: %s, fall back to the default handler' % m_type) f = self.routes.get(m_type, self.routes[NotImplementedError]) @@ -216,7 +215,32 @@ def get_default_fn(m_type): fn = get_default_fn(type(body)) else: fn = get_default_fn(type(msg)) - return fn + + self.logger.info('handling message with %s' % fn.__name__) + return fn(self.service_context, msg) + + def call_routes_send_back(self, msg: 'gnes_pb2.Message', out_sock): + try: + # NOTE that msg is mutable object, it may be modified in fn() + ret = self.call_routes(msg) + if ret is None: + # assume 'msg' is modified inside fn() + self.call_hooks(msg, hook_type='post', verbose=self.service_context.args.verbose) + send_message(out_sock, msg, timeout=self.service_context.args.timeout) + elif isinstance(ret, types.GeneratorType): + for r_msg in ret: + self.call_hooks(msg, hook_type='post', verbose=self.service_context.args.verbose) + send_message(out_sock, r_msg, timeout=self.service_context.args.timeout) + else: + raise ServiceError('unknown return type from the handler') + + except BlockMessage: + pass + except EventLoopEnd: + send_message(out_sock, msg, timeout=self.service_context.args.timeout) + raise EventLoopEnd + except ServiceError as ex: + self.logger.error(ex, exc_info=True) class ConcurrentService(type): @@ -308,10 +332,10 @@ def dump(self): self.logger.info('no dumping as "read_only" set to true.') @handler.register_hook(hook_type='post') - def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', old_body_type: str, *args, **kwargs): + def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs): new_type = msg.WhichOneof('body') - if new_type != old_body_type: - self.logger.warning('message body type has changed from %s to %s' % (old_body_type, new_type)) + if new_type != self._msg_old_type: + self.logger.warning('message body type has changed from %s to %s' % (self._msg_old_type, new_type)) @handler.register_hook(hook_type='post') def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs): @@ -325,45 +349,15 @@ def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs): 'descending' if msg.response.search.is_big_score_similar else 'ascending')) @handler.register_hook(hook_type=('pre', 'post'), only_when_verbose=True) - def _hook_logging_msg(self, msg: 'gnes_pb2.Message', *args, **kwargs): + def _hook_debug_msg(self, msg: 'gnes_pb2.Message', *args, **kwargs): pass - def message_handler(self, msg: 'gnes_pb2.Message', out_sck, ctrl_sck): - try: - fn = self.handler.get_serve_fn(msg) - if fn: - add_route(msg.envelope, self._model.__class__.__name__) - self.logger.info('handling a message with route: %s using handler %s' % (router2str(msg), fn.__name__)) - old_type = msg.WhichOneof('body') - if msg.request and msg.request.WhichOneof('body') and \ - type(getattr(msg.request, msg.request.WhichOneof('body'))) == gnes_pb2.Request.ControlRequest: - out_sock = ctrl_sck - else: - out_sock = out_sck - try: - # NOTE that msg is mutable object, it may be modified in fn() - ret = fn(self, msg) - self.logger.info('handler %s is done' % fn.__name__) - if ret is None: - # assume 'msg' is modified inside fn() - self.handler.call_hooks(msg, hook_type='post', verbose=self.args.verbose, - old_body_type=old_type) - send_message(out_sock, msg, timeout=self.args.timeout) - elif isinstance(ret, types.GeneratorType): - for r_msg in ret: - self.handler.call_hooks(msg, hook_type='post', verbose=self.args.verbose, - old_body_type=old_type) - send_message(out_sock, r_msg, timeout=self.args.timeout) - else: - raise ServiceError('unknown return type from the handler: %s' % fn) + @handler.register_hook(hook_type='pre') + def _hook_add_route(self, msg: 'gnes_pb2.Message', *args, **kwargs): + add_route(msg.envelope, self._model.__class__.__name__) + self._msg_old_type = msg.WhichOneof('body') + self.logger.info('handling a message with route: %s using handler %s' % (router2str(msg), fn.__name__)) - except BlockMessage: - pass - except EventLoopEnd: - send_message(out_sock, msg, timeout=self.args.timeout) - raise EventLoopEnd - except ServiceError as ex: - self.logger.error(ex, exc_info=True) @zmqd.context() def _run(self, ctx): @@ -403,9 +397,22 @@ def _run(self, ctx): if self.use_event_loop or pull_sock == ctrl_sock: with TimeContext('handling message', self.logger): self.is_handler_done.clear() + + # receive message msg = recv_message(pull_sock) - self.handler.call_hooks(msg, hook_type='pre', verbose=self.args.verbose) - self.message_handler(msg, out_sock, ctrl_sock) + + # choose output sock + if msg.request and msg.request.WhichOneof('body') and \ + type(getattr(msg.request, + msg.request.WhichOneof('body'))) == gnes_pb2.Request.ControlRequest: + o_sock = ctrl_sock + else: + o_sock = out_sock + + # call pre-hooks + self.handler.call_hooks(msg, hook_type='pre') + # call main handler and send result back + self.handler.call_routes_send_back(msg, o_sock) self.is_handler_done.set() else: self.logger.warning(