Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(service): send single long message rather than multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Sep 25, 2019
1 parent a2f1058 commit 8a0beec
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions gnes/proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def merge_routes(msg: 'gnes_pb2.Message', prev_msgs: List['gnes_pb2.Message']):
msg.envelope.routes.extend(sorted(routes.values(), key=lambda x: (x.start_time.seconds, x.start_time.nanos)))


def is_valid_version(msg: 'gnes_pb2.Message'):
def check_msg_version(msg: 'gnes_pb2.Message'):
from .. import __version__, __proto_version__
if hasattr(msg.envelope, 'gnes_version'):
if not msg.envelope.gnes_version:
Expand Down Expand Up @@ -210,9 +210,11 @@ def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1,
else:
doc_bytes, chunk_bytes = extract_raw_bytes_from_msg(msg)
# now raw_bytes are removed from message, hoping for faster de/serialization
sock.send_multipart([msg.envelope.client_id.encode(), b'1', msg.SerializeToString()])
sock.send_multipart(doc_bytes)
sock.send_multipart(chunk_bytes)
sock.send_multipart(
[msg.envelope.client_id.encode(),
b'1', msg.SerializeToString(),
b'%d' % len(doc_bytes), *doc_bytes,
b'%d' % len(chunk_bytes), *chunk_bytes])
except zmq.error.Again:
raise TimeoutError(
'cannot send message to sock %s after timeout=%dms, please check the following:'
Expand All @@ -234,17 +236,25 @@ def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = Fa
sock.setsockopt(zmq.RCVTIMEO, -1)

msg = gnes_pb2.Message()
_, raw_bytes_in_separate, msg_data = sock.recv_multipart()
msg.ParseFromString(msg_data)
msg_data = sock.recv_multipart()
raw_bytes_in_separate = (msg_data[1] == b'1')
msg.ParseFromString(msg_data[2])

if check_version:
is_valid_version(msg)
check_msg_version(msg)

# now we have a barebone msg, we need to fill in data
if raw_bytes_in_separate == b'1':
doc_bytes = sock.recv_multipart()
chunk_bytes = sock.recv_multipart()
fill_raw_bytes_to_msg(msg, doc_bytes[1:], chunk_bytes[1:])
if raw_bytes_in_separate:
doc_bytes_len_pos = 3
doc_bytes_len = int(msg_data[doc_bytes_len_pos])
doc_bytes = msg_data[(doc_bytes_len_pos + 1):(doc_bytes_len_pos + 1 + doc_bytes_len)]
chunk_bytes_len_pos = doc_bytes_len_pos + 1 + doc_bytes_len
chunk_bytes_len = int(msg_data[chunk_bytes_len_pos])
chunk_bytes = msg_data[(chunk_bytes_len_pos + 1):]
if len(chunk_bytes) != chunk_bytes_len:
raise ValueError('"chunk_bytes_len"=%d in message, but the actual length is %d' % (
chunk_bytes_len, len(chunk_bytes)))
fill_raw_bytes_to_msg(msg, doc_bytes, chunk_bytes)
return msg

except ValueError:
Expand Down

0 comments on commit 8a0beec

Please sign in to comment.