Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #292 from gnes-ai/fix-send-recv-2
Browse files Browse the repository at this point in the history
fix(service): fix send/recv for better compatibility
  • Loading branch information
mergify[bot] authored Sep 25, 2019
2 parents d404eaf + 30e763b commit 8495332
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
8 changes: 8 additions & 0 deletions .drone-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ steps:
from_secret: BADGE_WEBHOOK
BOT_URL:
from_secret: BOT_URL
GITHUB_USER:
from_secret: GITHUB_USER
GITHUB_PWD:
from_secret: GITHUB_PWD
commands:
- ./docker-build.sh
- docker system prune -a -f
Expand Down Expand Up @@ -114,6 +118,10 @@ steps:
from_secret: BADGE_WEBHOOK
BOT_URL:
from_secret: BOT_URL
GITHUB_USER:
from_secret: GITHUB_USER
GITHUB_PWD:
from_secret: GITHUB_PWD
commands:
- ./docker-build.sh

Expand Down
12 changes: 12 additions & 0 deletions docker-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ do
login_push ${HUB_USER} ${HUB_PWD} " " gnes
fi

if [[ -z "${GITHUB_USER}" ]]; then
printf "\$GITHUB_USER not set, pass\n"
else
login_push ${GITHUB_USER} ${GITHUB_PWD} docker.pkg.github.com docker.pkg.github.com/gnes-ai/gnes
fi

if [[ -z "${TCLOUD_USER}" ]]; then
printf "\$TCLOUD_USER not set, pass\n"
else
Expand All @@ -76,6 +82,12 @@ else
docker tag ${ALPINE_TAG} ccr.ccs.tencentyun.com/gnes/${DEFAULT_TAG} && docker push ccr.ccs.tencentyun.com/gnes/${DEFAULT_TAG}
fi

if [[ -z "${GITHUB_USER}" ]]; then
printf "\$GITHUB_USER not set, pass\n"
else
docker tag ${ALPINE_TAG} docker.pkg.github.com/gnes-ai/gnes/${DEFAULT_TAG} && docker push docker.pkg.github.com/gnes-ai/gnes/${DEFAULT_TAG}
fi

if [[ -z "${BADGE_WEBHOOK}" ]]; then
printf "\$BADGE_WEBHOOK not set, pass\n"
else
Expand Down
31 changes: 13 additions & 18 deletions gnes/proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ def extract_bytes_from_msg(msg: 'gnes_pb2.Message') -> Tuple:


def fill_raw_bytes_to_msg(msg: 'gnes_pb2.Message', msg_data: List[bytes]):
doc_byte_type = msg_data[3].decode()
chunk_byte_type = msg_data[4].decode()
doc_bytes_len = int(msg_data[5])
chunk_bytes_len = int(msg_data[6])
doc_byte_type = msg_data[2].decode()
chunk_byte_type = msg_data[3].decode()
doc_bytes_len = int(msg_data[4])
chunk_bytes_len = int(msg_data[5])

doc_bytes = msg_data[7:(7 + doc_bytes_len)]
chunk_bytes = msg_data[(7 + doc_bytes_len):]
doc_bytes = msg_data[6:(6 + doc_bytes_len)]
chunk_bytes = msg_data[(6 + doc_bytes_len):]

if len(chunk_bytes) != chunk_bytes_len:
raise ValueError('"chunk_bytes_len"=%d in message, but the actual length is %d' % (
Expand Down Expand Up @@ -261,16 +261,16 @@ def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1,
sock.setsockopt(zmq.SNDTIMEO, -1)

if not squeeze_pb:
sock.send_multipart([msg.envelope.client_id.encode(), b'0', msg.SerializeToString()])
sock.send_multipart([msg.envelope.client_id.encode(), msg.SerializeToString()])
else:
doc_bytes, doc_byte_type, chunk_bytes, chunk_byte_type = extract_bytes_from_msg(msg)
# now raw_bytes are removed from message, hoping for faster de/serialization
sock.send_multipart(
[msg.envelope.client_id.encode(), # 0
b'1', msg.SerializeToString(), # 1, 2
doc_byte_type, chunk_byte_type, # 3, 4
b'%d' % len(doc_bytes), b'%d' % len(chunk_bytes), # 5, 6
*doc_bytes, *chunk_bytes]) # 7, 8
msg.SerializeToString(), # 1
doc_byte_type, chunk_byte_type, # 2, 3
b'%d' % len(doc_bytes), b'%d' % len(chunk_bytes), # 4, 5
*doc_bytes, *chunk_bytes]) # 6, 7
except zmq.error.Again:
raise TimeoutError(
'cannot send message to sock %s after timeout=%dms, please check the following:'
Expand All @@ -284,7 +284,6 @@ def send_message(sock: 'zmq.Socket', msg: 'gnes_pb2.Message', timeout: int = -1,

def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = False, **kwargs) -> Optional[
'gnes_pb2.Message']:
response = []
try:
if timeout > 0:
sock.setsockopt(zmq.RCVTIMEO, timeout)
Expand All @@ -293,19 +292,15 @@ def recv_message(sock: 'zmq.Socket', timeout: int = -1, check_version: bool = Fa

msg = gnes_pb2.Message()
msg_data = sock.recv_multipart()
squeeze_pb = (msg_data[1] == b'1')
msg.ParseFromString(msg_data[2])

msg.ParseFromString(msg_data[1])
if check_version:
check_msg_version(msg)

# now we have a barebone msg, we need to fill in data
if squeeze_pb:
if len(msg_data) > 2:
fill_raw_bytes_to_msg(msg, msg_data)
return msg

except ValueError:
raise ValueError('received a wrongly-formatted request (expected 4 frames, got %d)' % len(response))
except zmq.error.Again:
raise TimeoutError(
'no response from sock %s after timeout=%dms, please check the following:'
Expand Down

0 comments on commit 8495332

Please sign in to comment.