Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
test(pipeline): test pipeline load from yaml
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Aug 30, 2019
1 parent d71f67a commit 8524b1f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
11 changes: 9 additions & 2 deletions gnes/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


import zmq
from termcolor import colored

from ..helper import set_logger
from ..proto import send_message, gnes_pb2, recv_message
Expand All @@ -33,7 +34,10 @@ def __init__(self, args):
getattr(self, 'identity', None))
self.sender, send_addr = build_socket(self.ctx, self.args.host_out, self.args.port_out, self.args.socket_out,
getattr(self, 'identity', None))
self.logger.info('send via %s, receive via %s' % (send_addr, recv_addr))
self.logger.info(
'input %s:%s\t output %s:%s' % (
self.args.host_in, colored(self.args.port_in, 'yellow'),
self.args.host_out, colored(self.args.port_out, 'yellow')))

def __enter__(self):
return self
Expand All @@ -47,7 +51,10 @@ def close(self):
self.ctx.term()

def send_message(self, message: "gnes_pb2.Message", timeout: int = -1):
self.logger.info('send message: %s' % message.envelope)
send_message(self.sender, message, timeout=timeout)

def recv_message(self, timeout: int = -1) -> gnes_pb2.Message:
return recv_message(self.receiver, timeout=timeout)
r = recv_message(self.receiver, timeout=timeout)
self.logger.info('recv a message: %s' % r.envelope)
return r
3 changes: 1 addition & 2 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'):
return msg

def remove_envelope(self, m: 'gnes_pb2.Message'):
resp = m.response or m.request
resp = m.response
resp.request_id = m.envelope.request_id
self.logger.info('unpacking a message and return to client: %s' % router2str(m))
return resp

def Call(self, request, context):
self.logger.info('received a new request')
with self.zmq_context as zmq_client:
zmq_client.send_message(self.add_envelope(request, zmq_client), self.args.timeout)
return self.remove_envelope(zmq_client.recv_message(self.args.timeout))
Expand Down

0 comments on commit 8524b1f

Please sign in to comment.