diff --git a/gnes/client/base.py b/gnes/client/base.py index ce4bbd66..aaaa6862 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -63,9 +63,11 @@ def get_default_fn(r_type): fn = self.routes.get(resp_type) else: fn = get_default_fn(type(resp)) - - self.logger.info('handling response with %s' % fn.__name__) - return fn(self._context, resp) + self.logger.info('handling response with %s' % fn.__name__) + return fn(self._context, resp) + else: + self.logger.warning('the received message is not response') + return None class ZmqClient: diff --git a/gnes/client/stream.py b/gnes/client/stream.py index 7adde956..6df6051e 100644 --- a/gnes/client/stream.py +++ b/gnes/client/stream.py @@ -44,7 +44,7 @@ class StreamingClient(GrpcClient): def __init__(self, args): super().__init__(args) - self._request_queue = queue.Queue(maxsize=1000) + self._request_queue = queue.Queue(maxsize=10) self._is_streaming = threading.Event() self._dispatch_thread = threading.Thread(target=self._start) @@ -63,16 +63,16 @@ def _start(self): self._is_streaming.clear() def _request_generator(self): - while self._is_streaming.is_set(): + while True: try: request = self._request_queue.get(block=True, timeout=5.0) if request is None: break yield request except queue.Empty: - continue + break except Exception as e: - print('exception: %s' % str(e)) + self.logger.error('exception: %s' % str(e)) break @handler.register(NotImplementedError) diff --git a/gnes/preprocessor/video/frame_select.py b/gnes/preprocessor/video/frame_select.py index d55aacbb..1e3fe6b6 100644 --- a/gnes/preprocessor/video/frame_select.py +++ b/gnes/preprocessor/video/frame_select.py @@ -46,6 +46,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None: else: idx = np.sort(np.random.choice(len(images), self.sframes, replace=False)) chunk.blob.CopyFrom(array2blob(images[idx])) + del images else: self.logger.error( 'bad document: "doc.chunks" is empty!') diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index b74c2727..6d62bae0 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -125,6 +125,9 @@ def get_response(num_recv, blocked=False): with self.zmq_context as zmq_client: for request in request_iterator: + num_recv = max(self.pending_request - self.args.max_pending_request, 0) + yield from get_response(num_recv, num_recv > 0) + zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) self.pending_request += 1