diff --git a/server/app/streams/VideoEncodingTask.py b/server/app/streams/VideoEncodingTask.py index 7a1749eb..e7fee6f4 100644 --- a/server/app/streams/VideoEncodingTask.py +++ b/server/app/streams/VideoEncodingTask.py @@ -7,6 +7,7 @@ import os import subprocess import threading +import time from biim.mpeg2ts import ts from biim.mpeg2ts.packetize import packetize_section from biim.mpeg2ts.pat import PATSection @@ -331,6 +332,11 @@ def __runEncoder(self, segment: VideoStreamSegment) -> None: Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' f'Waiting previous {ENCODER_TYPE} to finish...') self._encoder_process.wait() + time.sleep(0.1) # ちょっと待つ + + # 待機後にエンコードタスクがキャンセルされた場合、処理を中断する + if self._is_cancelled is True: + return # メソッドの実行自体を終了する Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] Encoding HLS segment...') @@ -515,13 +521,19 @@ def Reader() -> None: reader_thread.join() # この時点でエンコードタスクがキャンセルされていればエンコード済みのセグメントデータを放棄して中断する - if self._is_cancelled is True: + ## この時点でエンコーダープロセスが None になっている場合もキャンセルされたと判断する + if self._is_cancelled is True or self._encoder_process is None: self.__terminateEncoder() Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' 'Discarded encoded segment data because cancelled.') + # エンコード作業自体を中断したので、このセグメントの状態をリセットする ## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop) + + # エンコードタスクの完了フラグを立てる + ## 完了フラグを立てたタイミングで cancel() が戻る + self._is_finished = True return # この時点でエンコーダーの exit code が None (まだプロセスが起動している) か 0 でなければ何らかの理由でエンコードに失敗している @@ -529,6 +541,7 @@ def Reader() -> None: self.__terminateEncoder() Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' f'{ENCODER_TYPE} exited with exit code {self._encoder_process.poll()}.') + # おそらく復旧しようがないが、一応このセグメントの状態をリセットする ## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop) @@ -940,12 +953,12 @@ def __run(self, first_segment_index: int) -> None: # PTS がレンジ内にあれば if segment.start_pts <= current_pts <= segment.end_pts: - # この時点で前のセグメントのエンコードが終わっておらず、かつ現在の PTS が切り出し終了 PTS から 1 秒以上が経過している場合、 + # この時点で前のセグメントのエンコードが終わっておらず、かつ現在の PTS が切り出し終了 PTS から 3 秒以上が経過している場合、 # もう前のセグメントに該当するパケットは降ってこないだろうと判断し、もう投入するパケットがないことをエンコーダーに通知する ## これで tsreadex の標準入力が閉じられ、エンコードが完了する if (segment.sequence_index - 1 >= 0) and \ (self.video_stream.segments[segment.sequence_index - 1].encode_status != 'Completed') and \ - (current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 1 * ts.HZ): + (current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 3 * ts.HZ): self.video_stream.segments[segment.sequence_index - 1].segment_ts_packet_queue.put(None) # もう投入するパケットがないことを通知したので、エンコーダーの終了を待つ (重要) @@ -955,6 +968,10 @@ def __run(self, first_segment_index: int) -> None: f' Waiting for {ENCODER_TYPE} process to exit...') self._encoder_process.wait() + # 待機後にエンコードタスクがキャンセルされた場合、処理を中断してエンコードタスクを終了する + if self._is_cancelled is True: + return # メソッドの実行自体を終了する + # 当該セグメントのエンコードがすでに完了している場合は何もしない ## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、 ## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる @@ -1042,8 +1059,9 @@ def __run(self, first_segment_index: int) -> None: # 途中でエンコードタスクがキャンセルされた場合、処理中のセグメントがあるかに関わらずエンコードタスクを終了する # このとき、エンコーダーの出力はエンコードの完了を待つことなく破棄され、セグメントは処理開始前の状態にリセットされる if self._is_cancelled is True: - return + return # メソッドの実行自体を終了する + # エンコードタスクでのすべての処理を完了した self._is_finished = True Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask finished.') @@ -1060,7 +1078,7 @@ async def run(self, first_segment_index: int) -> None: await asyncio.to_thread(self.__run, first_segment_index) - def cancel(self) -> None: + async def cancel(self) -> None: """ 起動中のエンコードタスクをキャンセルし、起動中の外部プロセスを終了する """ @@ -1072,11 +1090,17 @@ def cancel(self) -> None: if self._is_cancelled is False: + # エンコードタスクがキャンセルされたことを示すフラグを立てる + ## この時点でまだ run() やエンコーダーが実行中であれば、run() やエンコーダーはこのフラグを見て自ら終了する + ## できるだけ早い段階でフラグを立てておくことが重要 + self._is_cancelled = True + # tsreadex とエンコーダーのプロセスを強制終了する Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask cancelling...') self.__terminateEncoder() - # エンコードタスクがキャンセルされたことを示すフラグを立てる - # この時点でまだ run() が実行中であれば、run() はこのフラグを見て自ら終了する - self._is_cancelled = True + # 完全に終了するまで待機する + while self._is_finished is False: + await asyncio.sleep(0.1) + Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask cancelled.') diff --git a/server/app/streams/VideoStream.py b/server/app/streams/VideoStream.py index 01141d12..e7b2b840 100644 --- a/server/app/streams/VideoStream.py +++ b/server/app/streams/VideoStream.py @@ -124,7 +124,7 @@ def __new__(cls, recorded_program: RecordedProgram, quality: QUALITY_TYPES) -> V # キャンセルされない限り VIDEO_STREAM_TIMEOUT 秒後にインスタンスを破棄するタイマー # cancel_destroy_timer() を呼び出すことでタイマーをキャンセルできる - instance._cancel_destroy_timer = SetTimeout(lambda: instance.destroy(), cls.VIDEO_STREAM_TIMEOUT) + instance._cancel_destroy_timer = SetTimeout(lambda: asyncio.create_task(instance.destroy()), cls.VIDEO_STREAM_TIMEOUT) # 生成したインスタンスを登録する cls.__instances[video_stream_id] = instance @@ -184,7 +184,7 @@ def keepAlive(self) -> None: self._cancel_destroy_timer() # キャンセルされない限り VIDEO_STREAM_TIMEOUT 秒後にインスタンスを破棄するタイマーを設定する - self._cancel_destroy_timer = SetTimeout(lambda: self.destroy(), self.VIDEO_STREAM_TIMEOUT) + self._cancel_destroy_timer = SetTimeout(lambda: asyncio.create_task(self.destroy()), self.VIDEO_STREAM_TIMEOUT) async def getVirtualPlaylist(self) -> str: @@ -333,7 +333,7 @@ async def getSegment(self, segment_sequence: int) -> bytes | None: # 以前のエンコードタスクをキャンセルする # この時点で以前のエンコードタスクでエンコードが完了していたセグメントに関してはそのまま self.segments に格納されている - self._encoding_task.cancel() + await self._encoding_task.cancel() Logging.info(f'[Video: {self.video_stream_id}][Segment {segment_sequence}] Previous Encoding Task Canceled.') # 新たにエンコードタスクを非同期で開始する @@ -346,7 +346,7 @@ async def getSegment(self, segment_sequence: int) -> bytes | None: return encoded_segment_ts - def destroy(self) -> None: + async def destroy(self) -> None: """ ビデオストリームで実行中のエンコードなどの処理を終了し、ビデオストリームを破棄する ユーザーが番組の視聴を終了した (keepAlive() が呼び出されなくなった) 場合に自動的に呼び出される @@ -355,7 +355,7 @@ def destroy(self) -> None: # 起動中のエンコードタスクがあればキャンセルする # この時点ですでにエンコードを完了して終了している場合もある if self._encoding_task is not None: - self._encoding_task.cancel() + await self._encoding_task.cancel() self._encoding_task = None # すべての HLS セグメントを削除する