Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(proto): fix merge route logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Sep 12, 2019
1 parent e9f065c commit 3db3444
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 57 deletions.
2 changes: 1 addition & 1 deletion gnes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

# do not change this line manually
# this is managed by shell/make-proto.sh and updated on every execution
__proto_version__ = '0.0.7'
__proto_version__ = '0.0.8'
3 changes: 3 additions & 0 deletions gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def set_composer_flask_parser(parser=None):
def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService, ParallelType
import random
import uuid
if not parser:
parser = set_base_parser()
min_port, max_port = 49152, 65536
Expand Down Expand Up @@ -154,6 +155,8 @@ def set_service_parser(parser=None):
parser.add_argument('--check_version', action='store_true', default=False,
help='comparing the GNES and proto version of incoming message with local setup, '
'mismatch raise an exception')
parser.add_argument('--identity', type=str, default=str(uuid.uuid4()).split('-')[0],
help='identity of the service, by default a random uuid string')
return parser


Expand Down
1 change: 0 additions & 1 deletion gnes/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ZmqClient:

def __init__(self, args):
self.args = args
self.identity = args.identity if 'identity' in args else None
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.ctx = zmq.Context()
self.ctx.setsockopt(zmq.LINGER, 0)
Expand Down
19 changes: 7 additions & 12 deletions gnes/proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,18 @@ def router2str(m: 'gnes_pb2.Message') -> str:
return colored('▸', 'green').join(route_str)


def add_route(evlp: 'gnes_pb2.Envelope', name: str):
def add_route(evlp: 'gnes_pb2.Envelope', name: str, identity: str):
r = evlp.routes.add()
r.service = name
r.start_time.GetCurrentTime()
r.service_identity = identity


def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message'], idx: int = -1):
r = msg.envelope.routes.pop(idx)
msg.envelope.routes.extend([m.envelope.routes[idx - 1] for m in prev_msgs[:-1] if len(m.envelope.routes) > 1])
msg.envelope.routes.extend([r])

# r.first_start_time.CopyFrom(
# sorted((m.envelope.routes[idx].start_time for m in prev_msgs),
# key=lambda x: (x.seconds, x.nanos))[0])
# r.last_end_time.CopyFrom(
# sorted((m.envelope.routes[idx].end_time for m in prev_msgs),
# key=lambda x: (x.seconds, x.nanos), reverse=True)[0])
def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message']):
# take unique routes by service identity
routes = {r.service_identity: r for m in prev_msgs for r in m.envelope.routes}
msg.envelope.ClearField('routes')
msg.envelope.routes.extend(sorted(routes.values(), key=lambda x: (x.start_time.seconds, x.start_time.nanos)))


def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1) -> None:
Expand Down
1 change: 1 addition & 0 deletions gnes/proto/gnes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message Envelope {
google.protobuf.Timestamp end_time = 3;
google.protobuf.Timestamp first_start_time = 4;
google.protobuf.Timestamp last_end_time = 5;
string service_identity = 6;
}
repeated route routes = 6;

Expand Down
77 changes: 42 additions & 35 deletions gnes/proto/gnes_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,12 @@ def __init__(self, args):
if 'py_path' in args and args.py_path:
PathImporter.add_modules(*args.py_path)
self.args = args
self.logger = set_logger(self.__class__.__name__, self.args.verbose)
self.logger = set_logger(self.__class__.__name__, args.verbose)
self.is_ready = self._get_event()
self.is_event_loop = self._get_event()
self.is_model_changed = self._get_event()
self.is_handler_done = self._get_event()
self._model = None
self.identity = args.identity if 'identity' in args else None
self.use_event_loop = True
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)

Expand Down Expand Up @@ -347,7 +346,7 @@ def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs):

@handler.register_hook(hook_type='pre')
def _hook_add_route(self, msg: 'gnes_pb2.Message', *args, **kwargs):
add_route(msg.envelope, self._model.__class__.__name__)
add_route(msg.envelope, self._model.__class__.__name__, self.args.identity)
self._msg_old_type = msg.WhichOneof('body')
self.logger.info('a message in type: %s with route: %s' % (self._msg_old_type, router2str(msg)))

Expand Down
8 changes: 3 additions & 5 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .. import __version__, __proto_version__
from ..client.base import ZmqClient
from ..helper import set_logger
from ..proto import gnes_pb2_grpc, gnes_pb2, router2str
from ..proto import gnes_pb2_grpc, gnes_pb2, router2str, add_route


class FrontendService:
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self, args):

def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'):
msg = gnes_pb2.Message()
msg.envelope.client_id = zmq_client.identity if zmq_client.identity else ''
msg.envelope.client_id = zmq_client.args.identity
if body.request_id is not None:
msg.envelope.request_id = body.request_id
else:
Expand All @@ -69,9 +69,7 @@ def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'):
msg.envelope.timeout = 5000
msg.envelope.gnes_version = __version__
msg.envelope.proto_version = __proto_version__
r = msg.envelope.routes.add()
r.service = FrontendService.__name__
r.start_time.GetCurrentTime()
add_route(msg.envelope, FrontendService.__name__, self.args.identity)
msg.request.CopyFrom(body)
return msg

Expand Down

0 comments on commit 3db3444

Please sign in to comment.