diff --git a/jina/serve/runtimes/worker/batch_queue.py b/jina/serve/runtimes/worker/batch_queue.py index f0a25f4202d8f..bde16beb151c9 100644 --- a/jina/serve/runtimes/worker/batch_queue.py +++ b/jina/serve/runtimes/worker/batch_queue.py @@ -39,6 +39,7 @@ def __init__( self._timeout: int = timeout self._reset() self._flush_trigger: Event = Event() + self._timer_started, self._timer_finished = False, False self._timer_task: Optional[Task] = None def __repr__(self) -> str: @@ -67,6 +68,7 @@ def _cancel_timer_if_pending(self): and not self._timer_task.done() and not self._timer_task.cancelled() ): + self._timer_finished = False self._timer_task.cancel() def _start_timer(self): @@ -74,13 +76,14 @@ def _start_timer(self): self._timer_task = asyncio.create_task( self._sleep_then_set() ) - self._timer_started = True async def _sleep_then_set(self): """Sleep and then set the event """ + self._timer_finished = False await asyncio.sleep(self._timeout / 1000) self._flush_trigger.set() + self._timer_finished = True async def push(self, request: DataRequest) -> asyncio.Queue: """Append request to the the list of requests to be processed. @@ -94,8 +97,10 @@ async def push(self, request: DataRequest) -> asyncio.Queue: """ docs = request.docs - # writes to shared data between tasks need to be mutually exclusive - if not self._timer_task: + if not self._timer_task or self._timer_finished: + # If there is no timer (first arrival), or the timer is already consumed, any new push should trigger a new Timer, before + # this push requests the data lock. The order of accessing the data lock guarantees that this request will be put in the `big_doc` + # before the `flush` task processes it. self._start_timer() async with self._data_lock: if not self._flush_task: @@ -216,9 +221,8 @@ def batch(iterable_1, iterable_2, n=1): # At this moment, we have documents concatenated in self._big_doc corresponding to requests in # self._requests with its lengths stored in self._requests_len. For each requests, there is a queue to # communicate that the request has been processed properly. At this stage the data_lock is ours and - # therefore noone can add requests to this list. + # therefore no-one can add requests to this list. self._flush_trigger: Event = Event() - self._timer_task = None try: if not docarray_v2: non_assigned_to_response_docs: DocumentArray = DocumentArray.empty() diff --git a/tests/integration/docarray_v2/test_v2.py b/tests/integration/docarray_v2/test_v2.py index da82bb52479e4..a40f1d745be1f 100644 --- a/tests/integration/docarray_v2/test_v2.py +++ b/tests/integration/docarray_v2/test_v2.py @@ -1634,7 +1634,7 @@ def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[DummyEmbeddingDoc]: depl = Deployment(uses=SlowExecutorWithException) with depl: - da = DocList[TextDoc]([TextDoc(text='good') for _ in range(50)]) + da = DocList[TextDoc]([TextDoc(text=f'good-{i}') for i in range(50)]) da[4].text = 'fail' responses = depl.post(on='/foo', inputs=da, request_size=1, return_responses=True, continue_on_error=True, results_in_order=True) assert len(responses) == 50 # 1 request per input