diff --git a/gnes/__init__.py b/gnes/__init__.py index 40428f59..224ca4e4 100644 --- a/gnes/__init__.py +++ b/gnes/__init__.py @@ -20,4 +20,4 @@ # do not change this line manually # this is managed by shell/make-proto.sh and updated on every execution -__proto_version__ = '0.0.7' +__proto_version__ = '0.0.8' diff --git a/gnes/cli/parser.py b/gnes/cli/parser.py index 442e08a1..e5a15c57 100644 --- a/gnes/cli/parser.py +++ b/gnes/cli/parser.py @@ -116,6 +116,7 @@ def set_composer_flask_parser(parser=None): def set_service_parser(parser=None): from ..service.base import SocketType, BaseService, ParallelType import random + import uuid if not parser: parser = set_base_parser() min_port, max_port = 49152, 65536 @@ -154,6 +155,8 @@ def set_service_parser(parser=None): parser.add_argument('--check_version', action='store_true', default=False, help='comparing the GNES and proto version of incoming message with local setup, ' 'mismatch raise an exception') + parser.add_argument('--identity', type=str, default=str(uuid.uuid4()).split('-')[0], + help='identity of the service, by default a random uuid string') return parser diff --git a/gnes/client/base.py b/gnes/client/base.py index 5ecb7491..d2dd6448 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -28,7 +28,6 @@ class ZmqClient: def __init__(self, args): self.args = args - self.identity = args.identity if 'identity' in args else None self.logger = set_logger(self.__class__.__name__, self.args.verbose) self.ctx = zmq.Context() self.ctx.setsockopt(zmq.LINGER, 0) diff --git a/gnes/proto/__init__.py b/gnes/proto/__init__.py index 5cc8cee5..124b9f16 100644 --- a/gnes/proto/__init__.py +++ b/gnes/proto/__init__.py @@ -107,23 +107,18 @@ def router2str(m: 'gnes_pb2.Message') -> str: return colored('▸', 'green').join(route_str) -def add_route(evlp: 'gnes_pb2.Envelope', name: str): +def add_route(evlp: 'gnes_pb2.Envelope', name: str, identity: str): r = evlp.routes.add() r.service = name r.start_time.GetCurrentTime() + r.service_identity = identity -def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message'], idx: int = -1): - r = msg.envelope.routes.pop(idx) - msg.envelope.routes.extend([m.envelope.routes[idx - 1] for m in prev_msgs[:-1] if len(m.envelope.routes) > 1]) - msg.envelope.routes.extend([r]) - - # r.first_start_time.CopyFrom( - # sorted((m.envelope.routes[idx].start_time for m in prev_msgs), - # key=lambda x: (x.seconds, x.nanos))[0]) - # r.last_end_time.CopyFrom( - # sorted((m.envelope.routes[idx].end_time for m in prev_msgs), - # key=lambda x: (x.seconds, x.nanos), reverse=True)[0]) +def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message']): + # take unique routes by service identity + routes = {r.service_identity: r for m in prev_msgs for r in m.envelope.routes} + msg.envelope.ClearField('routes') + msg.envelope.routes.extend(sorted(routes.values(), key=lambda x: (x.start_time.seconds, x.start_time.nanos))) def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1) -> None: diff --git a/gnes/proto/gnes.proto b/gnes/proto/gnes.proto index f412d49a..6cceddf9 100644 --- a/gnes/proto/gnes.proto +++ b/gnes/proto/gnes.proto @@ -93,6 +93,7 @@ message Envelope { google.protobuf.Timestamp end_time = 3; google.protobuf.Timestamp first_start_time = 4; google.protobuf.Timestamp last_end_time = 5; + string service_identity = 6; } repeated route routes = 6; diff --git a/gnes/proto/gnes_pb2.py b/gnes/proto/gnes_pb2.py index 5bbdbe10..921be909 100644 --- a/gnes/proto/gnes_pb2.py +++ b/gnes/proto/gnes_pb2.py @@ -21,7 +21,7 @@ package='gnes', syntax='proto3', serialized_options=None, - serialized_pb=_b('\n\ngnes.proto\x12\x04gnes\x1a\x1fgoogle/protobuf/timestamp.proto\"9\n\x07NdArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x11\n\x05shape\x18\x02 \x03(\rB\x02\x10\x01\x12\r\n\x05\x64type\x18\x03 \x01(\t\"\xb9\x01\n\x05\x43hunk\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x0e\n\x04text\x18\x02 \x01(\tH\x00\x12\x1d\n\x04\x62lob\x18\x03 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\r\n\x03raw\x18\x07 \x01(\x0cH\x00\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x15\n\toffset_nd\x18\x05 \x03(\rB\x02\x10\x01\x12\x0e\n\x06weight\x18\x06 \x01(\x02\x12 \n\tembedding\x18\x08 \x01(\x0b\x32\r.gnes.NdArrayB\t\n\x07\x63ontent\"\xc4\x02\n\x08\x44ocument\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x1b\n\x06\x63hunks\x18\x02 \x03(\x0b\x32\x0b.gnes.Chunk\x12(\n\x08\x64oc_type\x18\x03 \x01(\x0e\x32\x16.gnes.Document.DocType\x12\x11\n\tmeta_info\x18\x04 \x01(\x0c\x12\x12\n\x08raw_text\x18\x05 \x01(\tH\x00\x12\"\n\traw_image\x18\x06 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\"\n\traw_video\x18\x07 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x13\n\traw_bytes\x18\x08 \x01(\x0cH\x00\x12\x0e\n\x06weight\x18\n \x01(\x02\"A\n\x07\x44ocType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04TEXT\x10\x01\x12\t\n\x05IMAGE\x10\x02\x12\t\n\x05VIDEO\x10\x03\x12\t\n\x05\x41UDIO\x10\x04\x42\n\n\x08raw_data\"\x9a\x03\n\x08\x45nvelope\x12\x11\n\tclient_id\x18\x01 \x01(\t\x12\x12\n\nrequest_id\x18\x02 \x01(\r\x12\x0f\n\x07part_id\x18\x03 \x01(\r\x12\x10\n\x08num_part\x18\x04 \x03(\r\x12\x0f\n\x07timeout\x18\x05 \x01(\r\x12$\n\x06routes\x18\x06 \x03(\x0b\x32\x14.gnes.Envelope.route\x12\x14\n\x0cgnes_version\x18\x07 \x01(\t\x12\x15\n\rproto_version\x18\x08 \x01(\t\x1a\xdf\x01\n\x05route\x12\x0f\n\x07service\x18\x01 \x01(\t\x12.\n\nstart_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12,\n\x08\x65nd_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10\x66irst_start_time\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\rlast_end_time\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"y\n\x07Message\x12 \n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x0e.gnes.Envelope\x12 \n\x07request\x18\x02 \x01(\x0b\x32\r.gnes.RequestH\x00\x12\"\n\x08response\x18\x03 \x01(\x0b\x32\x0e.gnes.ResponseH\x00\x42\x06\n\x04\x62ody\"\xf6\x03\n\x07Request\x12\x12\n\nrequest_id\x18\x01 \x01(\r\x12+\n\x05train\x18\x02 \x01(\x0b\x32\x1a.gnes.Request.TrainRequestH\x00\x12+\n\x05index\x18\x03 \x01(\x0b\x32\x1a.gnes.Request.IndexRequestH\x00\x12,\n\x06search\x18\x04 \x01(\x0b\x32\x1a.gnes.Request.QueryRequestH\x00\x12/\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1c.gnes.Request.ControlRequestH\x00\x1a;\n\x0cTrainRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x12\r\n\x05\x66lush\x18\x02 \x01(\x08\x1a,\n\x0cIndexRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x1a<\n\x0cQueryRequest\x12\x1d\n\x05query\x18\x01 \x01(\x0b\x32\x0e.gnes.Document\x12\r\n\x05top_k\x18\x02 \x01(\r\x1am\n\x0e\x43ontrolRequest\x12\x35\n\x07\x63ommand\x18\x01 \x01(\x0e\x32$.gnes.Request.ControlRequest.Command\"$\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x42\x06\n\x04\x62ody\"\xbb\x06\n\x08Response\x12\x12\n\nrequest_id\x18\x01 \x01(\r\x12-\n\x05train\x18\x02 \x01(\x0b\x32\x1c.gnes.Response.TrainResponseH\x00\x12-\n\x05index\x18\x03 \x01(\x0b\x32\x1c.gnes.Response.IndexResponseH\x00\x12.\n\x06search\x18\x04 \x01(\x0b\x32\x1c.gnes.Response.QueryResponseH\x00\x12\x31\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1e.gnes.Response.ControlResponseH\x00\x1a\x36\n\rTrainResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x36\n\rIndexResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x38\n\x0f\x43ontrolResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\xf8\x02\n\rQueryResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x12\r\n\x05top_k\x18\x02 \x01(\r\x12?\n\x0ctopk_results\x18\x03 \x03(\x0b\x32).gnes.Response.QueryResponse.ScoredResult\x12\x1c\n\x14is_big_score_similar\x18\x04 \x01(\x08\x12\x11\n\tis_sorted\x18\x05 \x01(\x08\x1a\xbe\x01\n\x0cScoredResult\x12\x1c\n\x05\x63hunk\x18\x01 \x01(\x0b\x32\x0b.gnes.ChunkH\x00\x12\x1d\n\x03\x64oc\x18\x02 \x01(\x0b\x32\x0e.gnes.DocumentH\x00\x12>\n\x05score\x18\x03 \x01(\x0b\x32/.gnes.Response.QueryResponse.ScoredResult.Score\x1a)\n\x05Score\x12\r\n\x05value\x18\x01 \x01(\x02\x12\x11\n\texplained\x18\x02 \x01(\tB\x06\n\x04\x62ody\"-\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07PENDING\x10\x02\x42\x06\n\x04\x62ody2\xe3\x01\n\x07GnesRPC\x12(\n\x05Train\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Index\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Query\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\'\n\x04\x43\x61ll\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\x31\n\nStreamCall\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00(\x01\x30\x01\x62\x06proto3') + serialized_pb=_b('\n\ngnes.proto\x12\x04gnes\x1a\x1fgoogle/protobuf/timestamp.proto\"9\n\x07NdArray\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x11\n\x05shape\x18\x02 \x03(\rB\x02\x10\x01\x12\r\n\x05\x64type\x18\x03 \x01(\t\"\xb9\x01\n\x05\x43hunk\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x0e\n\x04text\x18\x02 \x01(\tH\x00\x12\x1d\n\x04\x62lob\x18\x03 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\r\n\x03raw\x18\x07 \x01(\x0cH\x00\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x15\n\toffset_nd\x18\x05 \x03(\rB\x02\x10\x01\x12\x0e\n\x06weight\x18\x06 \x01(\x02\x12 \n\tembedding\x18\x08 \x01(\x0b\x32\r.gnes.NdArrayB\t\n\x07\x63ontent\"\xc4\x02\n\x08\x44ocument\x12\x0e\n\x06\x64oc_id\x18\x01 \x01(\x04\x12\x1b\n\x06\x63hunks\x18\x02 \x03(\x0b\x32\x0b.gnes.Chunk\x12(\n\x08\x64oc_type\x18\x03 \x01(\x0e\x32\x16.gnes.Document.DocType\x12\x11\n\tmeta_info\x18\x04 \x01(\x0c\x12\x12\n\x08raw_text\x18\x05 \x01(\tH\x00\x12\"\n\traw_image\x18\x06 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\"\n\traw_video\x18\x07 \x01(\x0b\x32\r.gnes.NdArrayH\x00\x12\x13\n\traw_bytes\x18\x08 \x01(\x0cH\x00\x12\x0e\n\x06weight\x18\n \x01(\x02\"A\n\x07\x44ocType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04TEXT\x10\x01\x12\t\n\x05IMAGE\x10\x02\x12\t\n\x05VIDEO\x10\x03\x12\t\n\x05\x41UDIO\x10\x04\x42\n\n\x08raw_data\"\xb4\x03\n\x08\x45nvelope\x12\x11\n\tclient_id\x18\x01 \x01(\t\x12\x12\n\nrequest_id\x18\x02 \x01(\r\x12\x0f\n\x07part_id\x18\x03 \x01(\r\x12\x10\n\x08num_part\x18\x04 \x03(\r\x12\x0f\n\x07timeout\x18\x05 \x01(\r\x12$\n\x06routes\x18\x06 \x03(\x0b\x32\x14.gnes.Envelope.route\x12\x14\n\x0cgnes_version\x18\x07 \x01(\t\x12\x15\n\rproto_version\x18\x08 \x01(\t\x1a\xf9\x01\n\x05route\x12\x0f\n\x07service\x18\x01 \x01(\t\x12.\n\nstart_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12,\n\x08\x65nd_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10\x66irst_start_time\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x31\n\rlast_end_time\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x18\n\x10service_identity\x18\x06 \x01(\t\"y\n\x07Message\x12 \n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x0e.gnes.Envelope\x12 \n\x07request\x18\x02 \x01(\x0b\x32\r.gnes.RequestH\x00\x12\"\n\x08response\x18\x03 \x01(\x0b\x32\x0e.gnes.ResponseH\x00\x42\x06\n\x04\x62ody\"\xf6\x03\n\x07Request\x12\x12\n\nrequest_id\x18\x01 \x01(\r\x12+\n\x05train\x18\x02 \x01(\x0b\x32\x1a.gnes.Request.TrainRequestH\x00\x12+\n\x05index\x18\x03 \x01(\x0b\x32\x1a.gnes.Request.IndexRequestH\x00\x12,\n\x06search\x18\x04 \x01(\x0b\x32\x1a.gnes.Request.QueryRequestH\x00\x12/\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1c.gnes.Request.ControlRequestH\x00\x1a;\n\x0cTrainRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x12\r\n\x05\x66lush\x18\x02 \x01(\x08\x1a,\n\x0cIndexRequest\x12\x1c\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x0e.gnes.Document\x1a<\n\x0cQueryRequest\x12\x1d\n\x05query\x18\x01 \x01(\x0b\x32\x0e.gnes.Document\x12\r\n\x05top_k\x18\x02 \x01(\r\x1am\n\x0e\x43ontrolRequest\x12\x35\n\x07\x63ommand\x18\x01 \x01(\x0e\x32$.gnes.Request.ControlRequest.Command\"$\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x42\x06\n\x04\x62ody\"\xbb\x06\n\x08Response\x12\x12\n\nrequest_id\x18\x01 \x01(\r\x12-\n\x05train\x18\x02 \x01(\x0b\x32\x1c.gnes.Response.TrainResponseH\x00\x12-\n\x05index\x18\x03 \x01(\x0b\x32\x1c.gnes.Response.IndexResponseH\x00\x12.\n\x06search\x18\x04 \x01(\x0b\x32\x1c.gnes.Response.QueryResponseH\x00\x12\x31\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x1e.gnes.Response.ControlResponseH\x00\x1a\x36\n\rTrainResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x36\n\rIndexResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\x38\n\x0f\x43ontrolResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x1a\xf8\x02\n\rQueryResponse\x12%\n\x06status\x18\x01 \x01(\x0e\x32\x15.gnes.Response.Status\x12\r\n\x05top_k\x18\x02 \x01(\r\x12?\n\x0ctopk_results\x18\x03 \x03(\x0b\x32).gnes.Response.QueryResponse.ScoredResult\x12\x1c\n\x14is_big_score_similar\x18\x04 \x01(\x08\x12\x11\n\tis_sorted\x18\x05 \x01(\x08\x1a\xbe\x01\n\x0cScoredResult\x12\x1c\n\x05\x63hunk\x18\x01 \x01(\x0b\x32\x0b.gnes.ChunkH\x00\x12\x1d\n\x03\x64oc\x18\x02 \x01(\x0b\x32\x0e.gnes.DocumentH\x00\x12>\n\x05score\x18\x03 \x01(\x0b\x32/.gnes.Response.QueryResponse.ScoredResult.Score\x1a)\n\x05Score\x12\r\n\x05value\x18\x01 \x01(\x02\x12\x11\n\texplained\x18\x02 \x01(\tB\x06\n\x04\x62ody\"-\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07PENDING\x10\x02\x42\x06\n\x04\x62ody2\xe3\x01\n\x07GnesRPC\x12(\n\x05Train\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Index\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12(\n\x05Query\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\'\n\x04\x43\x61ll\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00\x12\x31\n\nStreamCall\x12\r.gnes.Request\x1a\x0e.gnes.Response\"\x00(\x01\x30\x01\x62\x06proto3') , dependencies=[google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,]) @@ -78,8 +78,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=1622, - serialized_end=1658, + serialized_start=1648, + serialized_end=1684, ) _sym_db.RegisterEnumDescriptor(_REQUEST_CONTROLREQUEST_COMMAND) @@ -104,8 +104,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=2443, - serialized_end=2488, + serialized_start=2469, + serialized_end=2514, ) _sym_db.RegisterEnumDescriptor(_RESPONSE_STATUS) @@ -371,6 +371,13 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='service_identity', full_name='gnes.Envelope.route.service_identity', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -384,7 +391,7 @@ oneofs=[ ], serialized_start=815, - serialized_end=1038, + serialized_end=1064, ) _ENVELOPE = _descriptor.Descriptor( @@ -463,7 +470,7 @@ oneofs=[ ], serialized_start=628, - serialized_end=1038, + serialized_end=1064, ) @@ -510,8 +517,8 @@ name='body', full_name='gnes.Message.body', index=0, containing_type=None, fields=[]), ], - serialized_start=1040, - serialized_end=1161, + serialized_start=1066, + serialized_end=1187, ) @@ -548,8 +555,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1380, - serialized_end=1439, + serialized_start=1406, + serialized_end=1465, ) _REQUEST_INDEXREQUEST = _descriptor.Descriptor( @@ -578,8 +585,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1441, - serialized_end=1485, + serialized_start=1467, + serialized_end=1511, ) _REQUEST_QUERYREQUEST = _descriptor.Descriptor( @@ -615,8 +622,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1487, - serialized_end=1547, + serialized_start=1513, + serialized_end=1573, ) _REQUEST_CONTROLREQUEST = _descriptor.Descriptor( @@ -646,8 +653,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1549, - serialized_end=1658, + serialized_start=1575, + serialized_end=1684, ) _REQUEST = _descriptor.Descriptor( @@ -707,8 +714,8 @@ name='body', full_name='gnes.Request.body', index=0, containing_type=None, fields=[]), ], - serialized_start=1164, - serialized_end=1666, + serialized_start=1190, + serialized_end=1692, ) @@ -738,8 +745,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1894, - serialized_end=1948, + serialized_start=1920, + serialized_end=1974, ) _RESPONSE_INDEXRESPONSE = _descriptor.Descriptor( @@ -768,8 +775,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1950, - serialized_end=2004, + serialized_start=1976, + serialized_end=2030, ) _RESPONSE_CONTROLRESPONSE = _descriptor.Descriptor( @@ -798,8 +805,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2006, - serialized_end=2062, + serialized_start=2032, + serialized_end=2088, ) _RESPONSE_QUERYRESPONSE_SCOREDRESULT_SCORE = _descriptor.Descriptor( @@ -835,8 +842,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2392, - serialized_end=2433, + serialized_start=2418, + serialized_end=2459, ) _RESPONSE_QUERYRESPONSE_SCOREDRESULT = _descriptor.Descriptor( @@ -882,8 +889,8 @@ name='body', full_name='gnes.Response.QueryResponse.ScoredResult.body', index=0, containing_type=None, fields=[]), ], - serialized_start=2251, - serialized_end=2441, + serialized_start=2277, + serialized_end=2467, ) _RESPONSE_QUERYRESPONSE = _descriptor.Descriptor( @@ -940,8 +947,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2065, - serialized_end=2441, + serialized_start=2091, + serialized_end=2467, ) _RESPONSE = _descriptor.Descriptor( @@ -1002,8 +1009,8 @@ name='body', full_name='gnes.Response.body', index=0, containing_type=None, fields=[]), ], - serialized_start=1669, - serialized_end=2496, + serialized_start=1695, + serialized_end=2522, ) _CHUNK.fields_by_name['blob'].message_type = _NDARRAY @@ -1267,8 +1274,8 @@ file=DESCRIPTOR, index=0, serialized_options=None, - serialized_start=2499, - serialized_end=2726, + serialized_start=2525, + serialized_end=2752, methods=[ _descriptor.MethodDescriptor( name='Train', diff --git a/gnes/service/base.py b/gnes/service/base.py index 431100ed..18333445 100644 --- a/gnes/service/base.py +++ b/gnes/service/base.py @@ -288,13 +288,12 @@ def __init__(self, args): if 'py_path' in args and args.py_path: PathImporter.add_modules(*args.py_path) self.args = args - self.logger = set_logger(self.__class__.__name__, self.args.verbose) + self.logger = set_logger(self.__class__.__name__, args.verbose) self.is_ready = self._get_event() self.is_event_loop = self._get_event() self.is_model_changed = self._get_event() self.is_handler_done = self._get_event() self._model = None - self.identity = args.identity if 'identity' in args else None self.use_event_loop = True self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl) @@ -347,7 +346,7 @@ def _hook_sort_response(self, msg: 'gnes_pb2.Message', *args, **kwargs): @handler.register_hook(hook_type='pre') def _hook_add_route(self, msg: 'gnes_pb2.Message', *args, **kwargs): - add_route(msg.envelope, self._model.__class__.__name__) + add_route(msg.envelope, self._model.__class__.__name__, self.args.identity) self._msg_old_type = msg.WhichOneof('body') self.logger.info('a message in type: %s with route: %s' % (self._msg_old_type, router2str(msg))) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 29613bb7..2f1516f4 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -22,7 +22,7 @@ from .. import __version__, __proto_version__ from ..client.base import ZmqClient from ..helper import set_logger -from ..proto import gnes_pb2_grpc, gnes_pb2, router2str +from ..proto import gnes_pb2_grpc, gnes_pb2, router2str, add_route class FrontendService: @@ -57,7 +57,7 @@ def __init__(self, args): def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): msg = gnes_pb2.Message() - msg.envelope.client_id = zmq_client.identity if zmq_client.identity else '' + msg.envelope.client_id = zmq_client.args.identity if body.request_id is not None: msg.envelope.request_id = body.request_id else: @@ -69,9 +69,7 @@ def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): msg.envelope.timeout = 5000 msg.envelope.gnes_version = __version__ msg.envelope.proto_version = __proto_version__ - r = msg.envelope.routes.add() - r.service = FrontendService.__name__ - r.start_time.GetCurrentTime() + add_route(msg.envelope, FrontendService.__name__, self.args.identity) msg.request.CopyFrom(body) return msg