diff --git a/gnes/client/stream.py b/gnes/client/stream.py index 4c7b423c..7adde956 100644 --- a/gnes/client/stream.py +++ b/gnes/client/stream.py @@ -44,27 +44,36 @@ class StreamingClient(GrpcClient): def __init__(self, args): super().__init__(args) - self._request_queue = queue.Queue() + self._request_queue = queue.Queue(maxsize=1000) self._is_streaming = threading.Event() self._dispatch_thread = threading.Thread(target=self._start) - self._dispatch_thread.setDaemon(1) - self._dispatch_thread.start() + self._dispatch_thread.setDaemon(True) def send_request(self, request): - self._request_queue.put(request) + self._request_queue.put(request, block=True) + + # create a new streaming call + if not self._is_streaming.is_set(): + self._dispatch_thread.start() def _start(self): self._is_streaming.set() - response_stream = self.stream_call(self._request_generator()) + self.stream_call(self._request_generator()) + self._is_streaming.clear() def _request_generator(self): while self._is_streaming.is_set(): try: - request = self._request_queue.get(block=True, timeout=1.0) + request = self._request_queue.get(block=True, timeout=5.0) + if request is None: + break yield request except queue.Empty: - pass + continue + except Exception as e: + print('exception: %s' % str(e)) + break @handler.register(NotImplementedError) def _handler_default(self, resp: 'gnes_pb2.Response'):