From 46f3048461cc63d50449c54cb9f03ec663c45946 Mon Sep 17 00:00:00 2001 From: felix Date: Sat, 12 Oct 2019 18:09:37 +0800 Subject: [PATCH 01/12] marking pyzmq version --- gnes/client/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gnes/client/base.py b/gnes/client/base.py index aaaa6862..11286ba6 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -77,6 +77,9 @@ def __init__(self, args): self.logger = set_logger(self.__class__.__name__, self.args.verbose) self.ctx = zmq.Context() self.ctx.setsockopt(zmq.LINGER, 0) + self.logger.info("current libzmq version is %s" % zmq.zmq_version()) + self.logger.info("current pyzmq version is %s" % zmq.__version__) + self.receiver, recv_addr = build_socket( self.ctx, self.args.host_in, self.args.port_in, self.args.socket_in, self.args.identity) From 2a728f8354bba133bc151b151d46be3927ccdc8d Mon Sep 17 00:00:00 2001 From: felix Date: Sat, 12 Oct 2019 18:23:13 +0800 Subject: [PATCH 02/12] temp commit --- gnes/service/frontend.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 051f5350..95f75dd4 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -113,25 +113,34 @@ def Search(self, request, context): return self.Call(request, context) def StreamCall(self, request_iterator, context): - self.pending_request = 0 + pending_request = 0 def get_response(num_recv, blocked=False): for _ in range(num_recv): if blocked or zmq_client.receiver.poll(1): + if blocked: + self.logger.info("waiting to receive response ...") msg = zmq_client.recv_message(**self.send_recv_kwargs) - self.pending_request -= 1 + if blocked: + self.logger.info('response is received!') + pending_request -= 1 yield self.remove_envelope(msg) with self.zmq_context as zmq_client: for request in request_iterator: - num_recv = max(self.pending_request - self.args.max_pending_request, 1) - yield from get_response(num_recv, num_recv > 1) + self.logger.info("get request: %d" % request.request_id) + num_recv = max(pending_request - self.args.max_pending_request, 1) + # yield from get_response(num_recv, num_recv > 1) + yield from get_response(max(int(pending_request / 2), 1), blocked=False) + self.logger.info("start to send request: %d (%d) ..." % (request.request_id, pending_request)) zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - self.pending_request += 1 + self.logger.info("request has been send out!") + pending_request += 1 - yield from get_response(self.pending_request, blocked=True) + self.logger.info("all requests are appended, waiting for responses ...") + yield from get_response(pending_request, blocked=True) class ZmqContext: """The zmq context class.""" From 3c3c54b5eb57213c20b50c6679c74750dc5d2306 Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 12:54:41 +0800 Subject: [PATCH 03/12] feat(webp-encoder): support webp encoder --- gnes/preprocessor/io_utils/webp.py | 34 ++++++++++++++++++++++++ gnes/preprocessor/video/video_encoder.py | 7 ++--- 2 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 gnes/preprocessor/io_utils/webp.py diff --git a/gnes/preprocessor/io_utils/webp.py b/gnes/preprocessor/io_utils/webp.py new file mode 100644 index 00000000..12d86e0a --- /dev/null +++ b/gnes/preprocessor/io_utils/webp.py @@ -0,0 +1,34 @@ +# Tencent is pleased to support the open source community by making GNES available. +# +# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +import webp +from PIL import Image + +def encode_video(images: List['np.ndarray'], frame_rate: int, pix_fmt: str = 'rgb24'): + height, width, channels = images[0].shape + if pix_fmt == 'rgb24': + pix_fmt = 'RGB' + # Save an animation + enc = webp.WebPAnimEncoder.new(width, height) + timestamp_ms = 0 + duration = 1000 // frame_rate + for x in images: + img = Image.fromarray(x.copy(), pix_fmt) + pic = webp.WebPPicture.from_pil(img) + enc.encode_frame(pic, timestamp_ms) + timestamp_ms += duration + anim_data = enc.assemble(timestamp_ms) + return bytes(anim_data.buffer()) diff --git a/gnes/preprocessor/video/video_encoder.py b/gnes/preprocessor/video/video_encoder.py index 0a22ca3d..1376a8c5 100644 --- a/gnes/preprocessor/video/video_encoder.py +++ b/gnes/preprocessor/video/video_encoder.py @@ -15,7 +15,7 @@ from ...proto import gnes_pb2, blob2array from ..base import BaseVideoPreprocessor -from ..io_utils import video, gif +from ..io_utils import video, gif, webp class VideoEncoderPreprocessor(BaseVideoPreprocessor): @@ -25,16 +25,17 @@ def __init__(self, frame_rate: int = 10, pix_fmt: str = 'rgb24', video_format: s self.frame_rate = frame_rate self.video_format = video_format - if self.video_format not in ['mp4', 'gif']: + if self.video_format not in ['mp4', 'gif', 'webp']: raise ValueError("%s encoder has not been supported!" % (self.video_format)) - def _encode(self, images: 'np.ndarray'): encoder = None if self.video_format == 'mp4': encoder = video elif self.video_format == 'gif': encoder = gif + elif self.video_format == 'webp': + encoder = webp return encoder.encode_video(images, pix_fmt=self.pix_fmt, frame_rate=self.frame_rate) From 1c173aa484ab2532e6c7cca8cf6bed45aca0816e Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 12:57:26 +0800 Subject: [PATCH 04/12] minor frontend revision --- gnes/service/frontend.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 2481224f..41d002bb 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -117,34 +117,29 @@ def Search(self, request, context): return self.Call(request, context) def StreamCall(self, request_iterator, context): - pending_request = 0 + self.pending_request = 0 def get_response(num_recv, blocked=False): for _ in range(num_recv): if blocked or zmq_client.receiver.poll(1): - if blocked: - self.logger.info("waiting to receive response ...") msg = zmq_client.recv_message(**self.send_recv_kwargs) - if blocked: - self.logger.info('response is received!') - pending_request -= 1 + self.pending_request -= 1 yield self.remove_envelope(msg) with self.zmq_context as zmq_client: for request in request_iterator: self.logger.info("get request: %d" % request.request_id) - num_recv = max(pending_request - self.args.max_pending_request, 1) - # yield from get_response(num_recv, num_recv > 1) - yield from get_response(max(int(pending_request / 2), 1), blocked=False) + num_recv = max(self.pending_request - self.args.max_pending_request, 1) + yield from get_response(num_recv, num_recv > 1) - self.logger.info("start to send request: %d (%d) ..." % (request.request_id, pending_request)) + # self.logger.info("start to send request: %d (%d) ..." % (request.request_id, self.pending_request)) zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - self.logger.info("request has been send out!") - pending_request += 1 + # self.logger.info("request has been send out!") + self.pending_request += 1 self.logger.info("all requests are appended, waiting for responses ...") - yield from get_response(pending_request, blocked=True) + yield from get_response(self.pending_request, blocked=True) class ZmqContext: """The zmq context class.""" From b80ce79748a1b88272949c55f35f8b6c778af813 Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 13:01:51 +0800 Subject: [PATCH 05/12] fix ... --- gnes/preprocessor/io_utils/webp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gnes/preprocessor/io_utils/webp.py b/gnes/preprocessor/io_utils/webp.py index 12d86e0a..6b6d8cb3 100644 --- a/gnes/preprocessor/io_utils/webp.py +++ b/gnes/preprocessor/io_utils/webp.py @@ -18,7 +18,7 @@ from PIL import Image def encode_video(images: List['np.ndarray'], frame_rate: int, pix_fmt: str = 'rgb24'): - height, width, channels = images[0].shape + height, width, _ = images[0].shape if pix_fmt == 'rgb24': pix_fmt = 'RGB' # Save an animation From 2045a542969eb76ce8aff49956295ba170d6bf28 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Mon, 14 Oct 2019 16:57:33 +0800 Subject: [PATCH 06/12] fix(style): double quote to single quote --- gnes/client/base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gnes/client/base.py b/gnes/client/base.py index 77ec9c8b..513dc8e9 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -78,8 +78,7 @@ def __init__(self, args): self.logger = set_logger(self.__class__.__name__, self.args.verbose) self.ctx = zmq.Context() self.ctx.setsockopt(zmq.LINGER, 0) - self.logger.info("current libzmq version is %s" % zmq.zmq_version()) - self.logger.info("current pyzmq version is %s" % zmq.__version__) + self.logger.info('current libzmq version is %s, pyzmq version is %s' % (zmq.zmq_version(), zmq.__version__)) self.receiver, recv_addr = build_socket( self.ctx, self.args.host_in, self.args.port_in, From 8911314b6ac90dba88ec9efe805182fb575e1e03 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Mon, 14 Oct 2019 16:59:17 +0800 Subject: [PATCH 07/12] fix(style): double quote to single quote --- gnes/service/frontend.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 41d002bb..54617a45 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -129,16 +129,14 @@ def get_response(num_recv, blocked=False): with self.zmq_context as zmq_client: for request in request_iterator: - self.logger.info("get request: %d" % request.request_id) + self.logger.info('send request: %s' % request.request_id) num_recv = max(self.pending_request - self.args.max_pending_request, 1) yield from get_response(num_recv, num_recv > 1) - # self.logger.info("start to send request: %d (%d) ..." % (request.request_id, self.pending_request)) zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - # self.logger.info("request has been send out!") self.pending_request += 1 - self.logger.info("all requests are appended, waiting for responses ...") + self.logger.info('all requests are sent, waiting for the responses...') yield from get_response(self.pending_request, blocked=True) class ZmqContext: From 58789b5102c99230da5dd329573fe70ae8b16e72 Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 17:11:45 +0800 Subject: [PATCH 08/12] add logging for frontend service --- gnes/service/frontend.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 54617a45..9b5fa6b4 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -120,6 +120,8 @@ def StreamCall(self, request_iterator, context): self.pending_request = 0 def get_response(num_recv, blocked=False): + if blocked: + self.logger.info("waiting for %d responses ..." % (num_recv)) for _ in range(num_recv): if blocked or zmq_client.receiver.poll(1): msg = zmq_client.recv_message(**self.send_recv_kwargs) @@ -129,10 +131,10 @@ def get_response(num_recv, blocked=False): with self.zmq_context as zmq_client: for request in request_iterator: - self.logger.info('send request: %s' % request.request_id) + self.logger.info('receive request: %s' % request.request_id) num_recv = max(self.pending_request - self.args.max_pending_request, 1) yield from get_response(num_recv, num_recv > 1) - + self.logger.info("send new request into %d appending tasks" % (self.pending_request)) zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) self.pending_request += 1 From 74becb305e2aa83e71ed50b5d17208b371f787d5 Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 17:31:38 +0800 Subject: [PATCH 09/12] fix depdency issue --- gnes/preprocessor/io_utils/webp.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gnes/preprocessor/io_utils/webp.py b/gnes/preprocessor/io_utils/webp.py index 6b6d8cb3..741e6cfa 100644 --- a/gnes/preprocessor/io_utils/webp.py +++ b/gnes/preprocessor/io_utils/webp.py @@ -14,10 +14,11 @@ # limitations under the License. from typing import List -import webp -from PIL import Image def encode_video(images: List['np.ndarray'], frame_rate: int, pix_fmt: str = 'rgb24'): + import webp + from PIL import Image + height, width, _ = images[0].shape if pix_fmt == 'rgb24': pix_fmt = 'RGB' From 4a5412bf06dd9d3000ff7908dc44e8b1ef7a8fdf Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 17:32:12 +0800 Subject: [PATCH 10/12] minor fix --- gnes/service/frontend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 9b5fa6b4..f88b2128 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -121,7 +121,7 @@ def StreamCall(self, request_iterator, context): def get_response(num_recv, blocked=False): if blocked: - self.logger.info("waiting for %d responses ..." % (num_recv)) + self.logger.info('waiting for %d responses ...' % (num_recv)) for _ in range(num_recv): if blocked or zmq_client.receiver.poll(1): msg = zmq_client.recv_message(**self.send_recv_kwargs) @@ -134,7 +134,7 @@ def get_response(num_recv, blocked=False): self.logger.info('receive request: %s' % request.request_id) num_recv = max(self.pending_request - self.args.max_pending_request, 1) yield from get_response(num_recv, num_recv > 1) - self.logger.info("send new request into %d appending tasks" % (self.pending_request)) + self.logger.info('send new request into %d appending tasks' % (self.pending_request)) zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) self.pending_request += 1 From fb79cf55b9b7a7ea70ff2207f7b6c4575209890f Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 18:22:52 +0800 Subject: [PATCH 11/12] fix unittest --- tests/test_dict_indexer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dict_indexer.py b/tests/test_dict_indexer.py index ee158fe0..911b6b46 100644 --- a/tests/test_dict_indexer.py +++ b/tests/test_dict_indexer.py @@ -50,7 +50,7 @@ def test_pymode(self): '--port_out', str(args.port_in), '--socket_in', str(SocketType.PULL_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), - '--yaml_path', '!DictIndexer {gnes_config: {name: dummy_dict_indexer}}', + '--yaml_path', '!DictIndexer {gnes_config: {name: dummy_dict_indexer.bin}}', ]) with ServiceManager(IndexerService, e_args), \ From 5121d65a57dcc9a0c77b77716be8a262dbc31bd0 Mon Sep 17 00:00:00 2001 From: felix Date: Mon, 14 Oct 2019 18:40:44 +0800 Subject: [PATCH 12/12] revert unittest --- tests/test_dict_indexer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dict_indexer.py b/tests/test_dict_indexer.py index 911b6b46..ee158fe0 100644 --- a/tests/test_dict_indexer.py +++ b/tests/test_dict_indexer.py @@ -50,7 +50,7 @@ def test_pymode(self): '--port_out', str(args.port_in), '--socket_in', str(SocketType.PULL_CONNECT), '--socket_out', str(SocketType.PUSH_CONNECT), - '--yaml_path', '!DictIndexer {gnes_config: {name: dummy_dict_indexer.bin}}', + '--yaml_path', '!DictIndexer {gnes_config: {name: dummy_dict_indexer}}', ]) with ServiceManager(IndexerService, e_args), \