From 8524b1f0f78925ea6aadd59d0648884af8d9639f Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Fri, 30 Aug 2019 15:26:28 +0800 Subject: [PATCH] test(pipeline): test pipeline load from yaml --- gnes/client/base.py | 11 +++++++++-- gnes/service/frontend.py | 3 +-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/gnes/client/base.py b/gnes/client/base.py index f01da81e..815c9e28 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -15,6 +15,7 @@ import zmq +from termcolor import colored from ..helper import set_logger from ..proto import send_message, gnes_pb2, recv_message @@ -33,7 +34,10 @@ def __init__(self, args): getattr(self, 'identity', None)) self.sender, send_addr = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out, getattr(self, 'identity', None)) - self.logger.info('send via %s, receive via %s' % (send_addr, recv_addr)) + self.logger.info( + 'input %s:%s\t output %s:%s' % ( + self.args.host_in, colored(self.args.port_in, 'yellow'), + self.args.host_out, colored(self.args.port_out, 'yellow'))) def __enter__(self): return self @@ -47,7 +51,10 @@ def close(self): self.ctx.term() def send_message(self, message: "gnes_pb2.Message", timeout: int = -1): + self.logger.info('send message: %s' % message.envelope) send_message(self.sender, message, timeout=timeout) def recv_message(self, timeout: int = -1) -> gnes_pb2.Message: - return recv_message(self.receiver, timeout=timeout) + r = recv_message(self.receiver, timeout=timeout) + self.logger.info('recv a message: %s' % r.envelope) + return r diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index b2488e33..60edd8d4 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -73,13 +73,12 @@ def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): return msg def remove_envelope(self, m: 'gnes_pb2.Message'): - resp = m.response or m.request + resp = m.response resp.request_id = m.envelope.request_id self.logger.info('unpacking a message and return to client: %s' % router2str(m)) return resp def Call(self, request, context): - self.logger.info('received a new request') with self.zmq_context as zmq_client: zmq_client.send_message(self.add_envelope(request, zmq_client), self.args.timeout) return self.remove_envelope(zmq_client.recv_message(self.args.timeout))