Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
refactor(service): minimize event loop, move handling to handler
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Sep 6, 2019
1 parent 0f04877 commit f5dac7a
Showing 1 changed file with 53 additions and 46 deletions.
99 changes: 53 additions & 46 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ def decorator(f):

return decorator

def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]], verbose: bool,
*args, **kwargs):
def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]], *args, **kwargs):
"""
All post handler hooks are called after the handler is done but before
sending out the message to the next service.
Expand All @@ -194,10 +193,10 @@ def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]],
raise TypeError('hook_type is in bad type: %s' % type(hook_type))

for fn, only_verbose in hooks:
if (only_verbose and verbose) or (not only_verbose):
if (only_verbose and self.service_context.args.verbose) or (not only_verbose):
fn(self.service_context, msg, *args, **kwargs)

def get_serve_fn(self, msg: 'gnes_pb2.Message'):
def call_routes(self, msg: 'gnes_pb2.Message'):
def get_default_fn(m_type):
self.logger.warning('cant find handler for message type: %s, fall back to the default handler' % m_type)
f = self.routes.get(m_type, self.routes[NotImplementedError])
Expand All @@ -216,7 +215,32 @@ def get_default_fn(m_type):
fn = get_default_fn(type(body))
else:
fn = get_default_fn(type(msg))
return fn

self.logger.info('handling message with %s' % fn.__name__)
return fn(self.service_context, msg)

def call_routes_send_back(self, msg: 'gnes_pb2.Message', out_sock):
try:
# NOTE that msg is mutable object, it may be modified in fn()
ret = self.call_routes(msg)
if ret is None:
# assume 'msg' is modified inside fn()
self.call_hooks(msg, hook_type='post', verbose=self.service_context.args.verbose)
send_message(out_sock, msg, timeout=self.service_context.args.timeout)
elif isinstance(ret, types.GeneratorType):
for r_msg in ret:
self.call_hooks(msg, hook_type='post', verbose=self.service_context.args.verbose)
send_message(out_sock, r_msg, timeout=self.service_context.args.timeout)
else:
raise ServiceError('unknown return type from the handler')

except BlockMessage:
pass
except EventLoopEnd:
send_message(out_sock, msg, timeout=self.service_context.args.timeout)
raise EventLoopEnd
except ServiceError as ex:
self.logger.error(ex, exc_info=True)


class ConcurrentService(type):
Expand Down Expand Up @@ -308,10 +332,10 @@ def dump(self):
self.logger.info('no dumping as "read_only" set to true.')

@handler.register_hook(hook_type='post')
def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', old_body_type: str, *args, **kwargs):
def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs):
new_type = msg.WhichOneof('body')
if new_type != old_body_type:
self.logger.warning('message body type has changed from %s to %s' % (old_body_type, new_type))
if new_type != self._msg_old_type:
self.logger.warning('message body type has changed from %s to %s' % (self._msg_old_type, new_type))

@handler.register_hook(hook_type='post')
def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs):
Expand All @@ -325,45 +349,15 @@ def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs):
'descending' if msg.response.search.is_big_score_similar else 'ascending'))

@handler.register_hook(hook_type=('pre', 'post'), only_when_verbose=True)
def _hook_logging_msg(self, msg: 'gnes_pb2.Message', *args, **kwargs):
def _hook_debug_msg(self, msg: 'gnes_pb2.Message', *args, **kwargs):
pass

def message_handler(self, msg: 'gnes_pb2.Message', out_sck, ctrl_sck):
try:
fn = self.handler.get_serve_fn(msg)
if fn:
add_route(msg.envelope, self._model.__class__.__name__)
self.logger.info('handling a message with route: %s using handler %s' % (router2str(msg), fn.__name__))
old_type = msg.WhichOneof('body')
if msg.request and msg.request.WhichOneof('body') and \
type(getattr(msg.request, msg.request.WhichOneof('body'))) == gnes_pb2.Request.ControlRequest:
out_sock = ctrl_sck
else:
out_sock = out_sck
try:
# NOTE that msg is mutable object, it may be modified in fn()
ret = fn(self, msg)
self.logger.info('handler %s is done' % fn.__name__)
if ret is None:
# assume 'msg' is modified inside fn()
self.handler.call_hooks(msg, hook_type='post', verbose=self.args.verbose,
old_body_type=old_type)
send_message(out_sock, msg, timeout=self.args.timeout)
elif isinstance(ret, types.GeneratorType):
for r_msg in ret:
self.handler.call_hooks(msg, hook_type='post', verbose=self.args.verbose,
old_body_type=old_type)
send_message(out_sock, r_msg, timeout=self.args.timeout)
else:
raise ServiceError('unknown return type from the handler: %s' % fn)
@handler.register_hook(hook_type='pre')
def _hook_add_route(self, msg: 'gnes_pb2.Message', *args, **kwargs):
add_route(msg.envelope, self._model.__class__.__name__)
self._msg_old_type = msg.WhichOneof('body')
self.logger.info('handling a message with route: %s using handler %s' % (router2str(msg), fn.__name__))

except BlockMessage:
pass
except EventLoopEnd:
send_message(out_sock, msg, timeout=self.args.timeout)
raise EventLoopEnd
except ServiceError as ex:
self.logger.error(ex, exc_info=True)

@zmqd.context()
def _run(self, ctx):
Expand Down Expand Up @@ -403,9 +397,22 @@ def _run(self, ctx):
if self.use_event_loop or pull_sock == ctrl_sock:
with TimeContext('handling message', self.logger):
self.is_handler_done.clear()

# receive message
msg = recv_message(pull_sock)
self.handler.call_hooks(msg, hook_type='pre', verbose=self.args.verbose)
self.message_handler(msg, out_sock, ctrl_sock)

# choose output sock
if msg.request and msg.request.WhichOneof('body') and \
type(getattr(msg.request,
msg.request.WhichOneof('body'))) == gnes_pb2.Request.ControlRequest:
o_sock = ctrl_sock
else:
o_sock = out_sock

# call pre-hooks
self.handler.call_hooks(msg, hook_type='pre')
# call main handler and send result back
self.handler.call_routes_send_back(msg, o_sock)
self.is_handler_done.set()
else:
self.logger.warning(
Expand Down

0 comments on commit f5dac7a

Please sign in to comment.