diff --git a/server/app/app.py b/server/app/app.py index 18e0d0c7..e962f13f 100644 --- a/server/app/app.py +++ b/server/app/app.py @@ -36,9 +36,6 @@ from app.utils.EDCB import EDCBTuner -# このアプリケーションの実行中のイベントループ -loop = asyncio.get_running_loop() - # もし Config() の実行時に AssertionError が発生した場合は、LoadConfig() を実行してサーバー設定データをロードする ## 自動リロードモードでは app.py がサーバープロセスのエントリーポイントになるため、 ## サーバープロセス上にサーバー設定データがロードされていない状態になる diff --git a/server/app/streams/VideoEncodingTask.py b/server/app/streams/VideoEncodingTask.py index 044959f0..7a1749eb 100644 --- a/server/app/streams/VideoEncodingTask.py +++ b/server/app/streams/VideoEncodingTask.py @@ -7,7 +7,6 @@ import os import subprocess import threading -import time from biim.mpeg2ts import ts from biim.mpeg2ts.packetize import packetize_section from biim.mpeg2ts.pat import PATSection @@ -48,6 +47,9 @@ def __init__(self, video_stream: VideoStream) -> None: # ビデオストリームのインスタンスをセット self.video_stream = video_stream + # 現在実行中のイベントループ + self._loop = asyncio.get_running_loop() + # tsreadex とエンコーダーのプロセス self._tsreadex_process: subprocess.Popen[bytes] | None = None self._encoder_process: subprocess.Popen[bytes] | None = None @@ -291,7 +293,7 @@ def buildHWEncCOptions(self, options.append(f'--output-res {video_width}x{video_height}') # 入力のタイムスタンプを出力にコピーする - options.append(f'--timestamp-passthrough --log-level trace') + options.append(f'--timestamp-passthrough') # 音声 options.append(f'--audio-codec aac:aac_coder=twoloop --audio-bitrate {QUALITY[quality].audio_bitrate}') @@ -309,17 +311,15 @@ def buildHWEncCOptions(self, return result - def __runEncoder(self, segment: VideoStreamSegment) -> bool: + def __runEncoder(self, segment: VideoStreamSegment) -> None: """ 録画 TS データから直接切り出した生の MPEG-TS チャンクをエンコードするエンコーダープロセスを開始する セグメントのキューに入れられた TS パケットをエンコーダーに順次投入し、エンコード済みのセグメントデータを VideoStreamSegment に書き込む - 非同期 (asyncio.create_task()) で実行するとイベントループがビジーになったりなど厄介な問題が発生するため同期関数とし、__run() とは別のスレッド上で実行する + 非同期 (asyncio.create_task()) で実行するとイベントループがビジーになったりなど厄介な問題が発生するため同期メソッドとしている + このメソッドはエンコードが完了/失敗するか、エンコードタスクがキャンセルされるまでブロックする Args: segment (VideoStreamSegment): エンコード対象の VideoStreamSegment のデータ - - Returns: - bool: 最終的にエンコードが成功したかどうか """ # エンコーダーの種類を取得 @@ -407,108 +407,144 @@ def __runEncoder(self, segment: VideoStreamSegment) -> bool: # エンコーダープロセスを非同期で作成・実行 self._encoder_process = subprocess.Popen( - [LIBRARY_PATH[ENCODER_TYPE], *encoder_options, '--log-packets', f'seg_{segment.sequence_index}_in_packets.log', '--log-mux-ts', f'seg_{segment.sequence_index}_out_muxts.log'], + [LIBRARY_PATH[ENCODER_TYPE], *encoder_options], stdin = self._tsreadex_process.stdout, # tsreadex からの入力 stdout = subprocess.PIPE, # ストリーム出力 # stderr = subprocess.DEVNULL, stderr = None, # デバッグ用 ) - # ***** エンコーダーへの切り出した TS パケットの書き込み ***** - - # TS パケットのバッファ - # バッファサイズ: 188B (TS Packet Size) * 256 = 48128B - ts_packet_buffer = bytearray() - - # エンコーダーに投入した TS パケットのバイト数 - segment_bytes_count = 0 - - # Queue からの TS パケットの取得に移る前に少し待つ (重要) - time.sleep(0.1) - - while True: - - # Queue から切り出された TS パケットを随時取得 - ts_packet = segment.segment_ts_packet_queue.get() - - # バッファに TS パケットを追加 - if ts_packet is not None: - ts_packet_buffer += ts_packet - - # 48128B に到達した or これ以上エンコーダーに投入するパケットがなくなったらバッファをエンコーダー (正確にはその前段の tsreadex) に投入 - assert self._tsreadex_process.stdin is not None - if len(ts_packet_buffer) >= 188 * 256 or ts_packet is None: - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Writing TS packets to {ENCODER_TYPE}... (Buffer Size: {len(ts_packet_buffer)}B)') - try: - self._tsreadex_process.stdin.write(ts_packet_buffer) - except Exception as ex: - Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Failed to write TS packets to {ENCODER_TYPE}.') - Logging.error(ex) - # エンコーダーに投入した TS パケットのバイト数を加算 - segment_bytes_count += len(ts_packet_buffer) - # バッファを空にする - ts_packet_buffer = bytearray() - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Write TS packets to {ENCODER_TYPE}. Cut out {segment_bytes_count / 1024 / 1024:.3f} MiB in total.') - - # これ以上エンコーダーに投入するパケットがなくなったら tsreadex の標準入力を閉じ、エンコーダーの出力の読み取りを待つ - if ts_packet is None: # None はこれ以上投入するパケットがないことを示す - Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Cut out {segment_bytes_count / 1024 / 1024:.3f} MiB.') - Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Waiting for {ENCODER_TYPE} to finish...') - self._tsreadex_process.stdin.close() - break # ループを抜ける - - # すでにエンコーダーが終了しているならループを抜ける - if self._tsreadex_process.poll() is not None or self._encoder_process.poll() is not None: - break + # 処理対象の VideoStreamSegment をエンコード中状態に設定 + segment.encode_status = 'Encoding' - # すでにエンコードタスクがキャンセルされている - if self._is_cancelled is True: - break + # 受信したエンコード済み TS パケットのバッファ + ## 最終的に単一のセグメントのすべての TS パケットが入る + encoded_ts_packet_buffer = bytearray() - # ***** エンコーダーからの出力の読み取り ***** + # ***** 切り出した TS パケットをエンコーダーに送信するスレッド ***** - # 上記処理で None を Queue から受け取ったタイミングで正常終了するはずなので、エンコード済みのセグメントデータを取得する - assert self._encoder_process.stdout is not None - segment_ts = self._encoder_process.stdout.read() + def Writer() -> None: - # 処理対象の VideoStreamSegment をエンコード完了状態に設定 - assert segment.encoded_segment_ts_future.done() is False # すでに完了しているはずはない - segment.is_encode_completed = True + # 送信する TS パケットのバッファ + # バッファサイズ: 188B (TS Packet Size) * 256 = 48128B + ts_packet_buffer = bytearray() + + # エンコーダーに投入した TS パケットのバイト数 + segment_bytes_count = 0 + + while True: + + # すでにエンコーダーが強制終了されているならループを抜ける + ## 強制終了された後は None になるのを利用する + ## エンコードタスクがキャンセルされた時にしか発生しないはず + if self._tsreadex_process is None or self._encoder_process is None or self._is_cancelled is True: + break + + # Queue から切り出された TS パケットを随時取得 + ts_packet = segment.segment_ts_packet_queue.get() + if ts_packet is not None: + ts_packet_buffer += ts_packet + + # 48128B に到達した or これ以上エンコーダーに投入するパケットがなくなったら、 + # バッファをエンコーダー (正確にはその前段の tsreadex) に投入 + if len(ts_packet_buffer) >= 188 * 256 or ts_packet is None: + try: + if self._tsreadex_process is not None: # 念のため + assert self._tsreadex_process.stdin is not None + self._tsreadex_process.stdin.write(ts_packet_buffer) + self._tsreadex_process.stdin.flush() + except Exception as ex: + Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' + f'Failed to write TS packets to {ENCODER_TYPE}. ({ex})') + + # エンコーダーに投入した TS パケットのバイト数を加算 + segment_bytes_count += len(ts_packet_buffer) + + # バッファを空にする + ts_packet_buffer = bytearray() + + # これ以上エンコーダーに投入するパケットがなくなったら tsreadex の標準入力を閉じ、エンコーダーの出力の読み取りを待つ + if ts_packet is None: # None はこれ以上投入するパケットがないことを示す + Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' + f'Cut out {segment_bytes_count / 1024 / 1024:.3f} MiB.') + Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' + f'Waiting for {ENCODER_TYPE} to finish...') + if self._tsreadex_process is not None: # 念のため + assert self._tsreadex_process.stdin is not None + self._tsreadex_process.stdin.close() + break # ループを抜ける + + # ***** エンコード済み TS パケット (セグメントデータ) をエンコーダーから受信するスレッド ***** + + def Reader() -> None: + nonlocal encoded_ts_packet_buffer + + while True: + + # すでにエンコーダーが強制終了されているならループを抜ける + ## 強制終了された後は None になるのを利用する + ## エンコードタスクがキャンセルされた時にしか発生しないはず + if self._tsreadex_process is None or self._encoder_process is None or self._is_cancelled is True: + break + + if self._encoder_process is not None: # 念のため + try: + # エンコーダーの出力を 48128B (188 * 256) ずつ受信してバッファに保存する + assert self._encoder_process.stdout is not None + ts_packets = self._encoder_process.stdout.read(188 * 256) + + # 出力の終端に到達したらループを抜ける + if ts_packets == b'': + break + + # バッファに受信したエンコード済み TS パケットを追加 + encoded_ts_packet_buffer += ts_packets + + except Exception as ex: + Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' + f'Failed to read encoded TS packets from {ENCODER_TYPE}. ({ex})') + + # ***** Writer と Reader の終了後、エンコード済み TS パケットを VideoStreamSegment に書き込む ***** + + # Writer と Reader を同時実行して、スレッドを終了するまで待つ + writer_thread = threading.Thread(target=Writer, daemon=True) + reader_thread = threading.Thread(target=Reader, daemon=True) + writer_thread.start() + reader_thread.start() + writer_thread.join() + reader_thread.join() # この時点でエンコードタスクがキャンセルされていればエンコード済みのセグメントデータを放棄して中断する if self._is_cancelled is True: self.__terminateEncoder() Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - 'Discarded encoded segment data because cancelled.') + 'Discarded encoded segment data because cancelled.') # エンコード作業自体を中断したので、このセグメントの状態をリセットする - segment.encoded_segment_ts_future.set_result(b'') # この Future を待っている箇所があるかもなので状態リセット前に解決しておく - segment.resetState() - return False + ## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する + asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop) + return - # この時点でエンコーダーの exit code が 0 か None (まだプロセスが起動している) でなければ何らかの理由でエンコードに失敗している - if self._encoder_process.poll() != 0 and self._encoder_process.poll() is not None: + # この時点でエンコーダーの exit code が None (まだプロセスが起動している) か 0 でなければ何らかの理由でエンコードに失敗している + if self._encoder_process.poll() is not None and self._encoder_process.poll() != 0: self.__terminateEncoder() Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'{ENCODER_TYPE} exited with exit code {self._encoder_process.poll()}.') - # 返すデータがないが復旧もできないので、Future には空のデータを設定する - segment.encoded_segment_ts_future.set_result(b'') - return False + f'{ENCODER_TYPE} exited with exit code {self._encoder_process.poll()}.') + # おそらく復旧しようがないが、一応このセグメントの状態をリセットする + ## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する + asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop) + return - # この時点で tsreadex とエンコーダーは終了しているはずだが、念のため強制終了しておく - # 上記の判定を行った後に行うのが重要 (kill すると exit code が 0 以外になる可能性があるため) - self.__terminateEncoder() + # 処理対象の VideoStreamSegment をエンコード完了状態に設定 + segment.encode_status = 'Completed' # エンコード後のセグメントデータを VideoStreamSegment に書き込む # ここで設定したエンコード済みのセグメントデータが API で返される - segment.encoded_segment_ts_future.set_result(segment_ts) - + segment.encoded_segment_ts_future.set_result(bytes(encoded_ts_packet_buffer)) Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] Successfully encoded HLS segment.') - return True + + # この時点で tsreadex とエンコーダーは終了しているはずだが、念のため強制終了しておく + # 最後に行うのが重要 (kill すると exit code が 0 以外になる可能性があるため) + self.__terminateEncoder() def __terminateEncoder(self) -> None: @@ -540,11 +576,11 @@ def __terminateEncoder(self) -> None: def __run(self, first_segment_index: int) -> None: """ HLS エンコードタスクを実行する - 非同期 (asyncio.create_task()) で実行するとイベントループがビジーになったりなど厄介な問題が発生するため、意図的に同期関数としている + 非同期 (asyncio.create_task()) で実行するとイベントループがビジーになったりなど厄介な問題が発生するため、意図的に同期メソッドとしている aiofiles は単に裏でスレッドプールに投げてるだけなので、それなら全部別スレッドで実行したほうがパフォーマンスが良いと判断 TODO: 現状 PCR や PTS が一周した時の処理は何も考えてない - biim の実装をかなり参考にした + biim の実装をめちゃくちゃ参考にした (圧倒的感謝…!!) ref: https://github.com/monyone/biim/blob/other/static-ondemand-hls/seekable.py ref: https://github.com/monyone/biim/blob/other/static-ondemand-hls/vod_main.py ref: https://github.com/monyone/biim/blob/other/static-ondemand-hls/vod_fmp4.py @@ -567,6 +603,9 @@ def __run(self, first_segment_index: int) -> None: Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask started.') + # エンコーダーの種類を取得 + ENCODER_TYPE = Config().general.encoder + # 視聴対象の録画番組が放送されたチャンネルのサービス ID SERVICE_ID: int | None = self.recorded_program.channel.service_id if self.recorded_program.channel is not None else None @@ -810,12 +849,9 @@ def __run(self, first_segment_index: int) -> None: ## monotonic_segment_index が初期値のときは TS パケットの投入は行われない pat_packets = packetize_section(PAT, False, False, PAT_PID, 0, pat_continuity_counter) pat_continuity_counter = (pat_continuity_counter + len(pat_packets)) & 0x0F # Continuity Counter を更新 - if monotonic_segment_index >= 0 and self.video_stream.segments[monotonic_segment_index].is_encode_completed is False: + if monotonic_segment_index >= 0 and self.video_stream.segments[monotonic_segment_index].encode_status != 'Completed': for pat_packet in pat_packets: self.video_stream.segments[monotonic_segment_index].segment_ts_packet_queue.put(pat_packet) - if len(pat_packets) > 0: - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {monotonic_segment_index}] ' - f'Put PAT packets to segment queue.') # 最新のパケット化済み PAT を保持する ## エンコーダーに投入したかに関わらず常に保持する必要がある latest_pat_packets = pat_packets @@ -876,12 +912,9 @@ def __run(self, first_segment_index: int) -> None: ## monotonic_segment_index が初期値のときは TS パケットの投入は行われない pmt_packets = packetize_section(PMT, False, False, PMT_PID, 0, pmt_continuity_counter) pmt_continuity_counter = (pmt_continuity_counter + len(pmt_packets)) & 0x0F # Continuity Counter を更新 - if monotonic_segment_index >= 0 and self.video_stream.segments[monotonic_segment_index].is_encode_completed is False: + if monotonic_segment_index >= 0 and self.video_stream.segments[monotonic_segment_index].encode_status != 'Completed': for pmt_packet in pmt_packets: self.video_stream.segments[monotonic_segment_index].segment_ts_packet_queue.put(pmt_packet) - if len(pmt_packets) > 0: - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {monotonic_segment_index}] ' - f'Put PMT packets to segment queue.') # 最新のパケット化済み PMT を保持する ## エンコーダーに投入したかに関わらず常に保持する必要がある latest_pmt_packets = pmt_packets @@ -907,51 +940,44 @@ def __run(self, first_segment_index: int) -> None: # PTS がレンジ内にあれば if segment.start_pts <= current_pts <= segment.end_pts: - # この時点で前のセグメントのエンコードが終わっておらず、かつ切り出し終了 PTS から 3 秒以上が経過している場合、 - # もう前のセグメントに該当するパケットは降ってこないと判断し、もう投入するパケットがないことをエンコーダーに通知する + # この時点で前のセグメントのエンコードが終わっておらず、かつ現在の PTS が切り出し終了 PTS から 1 秒以上が経過している場合、 + # もう前のセグメントに該当するパケットは降ってこないだろうと判断し、もう投入するパケットがないことをエンコーダーに通知する ## これで tsreadex の標準入力が閉じられ、エンコードが完了する if (segment.sequence_index - 1 >= 0) and \ - (self.video_stream.segments[segment.sequence_index - 1].is_encode_completed is False) and \ - (current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 3 * ts.HZ): + (self.video_stream.segments[segment.sequence_index - 1].encode_status != 'Completed') and \ + (current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 1 * ts.HZ): self.video_stream.segments[segment.sequence_index - 1].segment_ts_packet_queue.put(None) - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index - 1}] ' - f'Put None to segment queue.') - # エンコーダーの終了を待つ (重要) + # もう投入するパケットがないことを通知したので、エンコーダーの終了を待つ (重要) ## ファイルの読み取りよりエンコードの方が基本的に遅いので、前のセグメントのエンコード中に次のエンコードを開始しないようにする if self._encoder_process is not None: + Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index - 1}]' + f' Waiting for {ENCODER_TYPE} process to exit...') self._encoder_process.wait() # 当該セグメントのエンコードがすでに完了している場合は何もしない ## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、 ## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる - if segment.is_encode_completed is True: + if segment.encode_status == 'Completed': break # 当該セグメントの PTS レンジに一致する最初のパケットのみ、Queue への投入前に非同期でエンコーダーを起動する if segment.is_started is False: - # 非同期でエンコーダーを起動する - ## 実際にエンコーダーが起動するタイミングは前のセグメントのエンコードが完了した後になる + # バックグラウンドでエンコーダーを起動する + ## 実際にエンコーダーが起動するタイミングは前のセグメントのエンコーダーが終了した後になる Logging.info(f'[Video: {self.video_stream.video_stream_id}] Switched to next segment: {segment.sequence_index}') Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' f'Start: {(segment.start_pts - self.video_stream.segments[0].start_pts) / ts.HZ:.3f} / ' f'End: {((segment.start_pts - self.video_stream.segments[0].start_pts) / ts.HZ) + segment.duration_seconds:.3f}') - thread = threading.Thread(target=self.__runEncoder, args=(segment,)) - thread.start() + threading.Thread(target=self.__runEncoder, args=(segment,), daemon=True).start() # 前回取得した最新の PAT / PMT を投入する ## エンコーダーは最初の PAT / PMT より前のデータをデコードできないため、最初のパケットを投入する前に入れておく必要がある for pat_packet in latest_pat_packets: segment.segment_ts_packet_queue.put(pat_packet) - if len(latest_pat_packets) > 0: - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Put initial PAT packets to segment queue.') for pmt_packet in latest_pmt_packets: segment.segment_ts_packet_queue.put(pmt_packet) - if len(latest_pmt_packets) > 0: - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Put initial PMT packets to segment queue.') # このタイミングで monotonic_segment_index が初期値の場合のみ、 # PTS が存在しないパケットがどのセグメントに TS パケットを投入すれば良いかのインデックスを切り替える @@ -973,8 +999,6 @@ def __run(self, first_segment_index: int) -> None: # ここで Queue に投入したパケットがそのまま tsreadex → エンコーダーに投入される ## セグメント間で PTS レンジが重複することはないので、最初に一致したセグメントの Queue だけ処理すればよい segment.segment_ts_packet_queue.put(ts_packet) - Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] ' - f'Put PES header packet to segment queue.') break # ヘッダなしの続きの PES パケット (映像・音声・字幕・メタデータ) かつ、PID が一致する前回取得した PES ヘッダに PTS が含まれている場合 @@ -1012,7 +1036,7 @@ def __run(self, first_segment_index: int) -> None: # 当該セグメントのエンコードが完了していない場合のみ、TS パケットを投入する ## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、 ## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる - if self.video_stream.segments[monotonic_segment_index].is_encode_completed is False: + if self.video_stream.segments[monotonic_segment_index].encode_status != 'Completed': self.video_stream.segments[monotonic_segment_index].segment_ts_packet_queue.put(ts_packet) # 途中でエンコードタスクがキャンセルされた場合、処理中のセグメントがあるかに関わらずエンコードタスクを終了する @@ -1049,6 +1073,7 @@ def cancel(self) -> None: if self._is_cancelled is False: # tsreadex とエンコーダーのプロセスを強制終了する + Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask cancelling...') self.__terminateEncoder() # エンコードタスクがキャンセルされたことを示すフラグを立てる diff --git a/server/app/streams/VideoStream.py b/server/app/streams/VideoStream.py index a17060e9..01141d12 100644 --- a/server/app/streams/VideoStream.py +++ b/server/app/streams/VideoStream.py @@ -10,7 +10,7 @@ from biim.mpeg2ts import ts from dataclasses import dataclass from rich import print -from typing import Any, Callable, ClassVar +from typing import Any, Callable, ClassVar, Literal from app.config import Config from app.constants import LIBRARY_PATH, QUALITY_TYPES @@ -55,7 +55,7 @@ class VideoStreamSegment: # 上記の情報に基づいて切り出された HLS セグメントの MPEG-TS データを入れる Queue (エンコーダーが同期関数なので同期用の Queue にしている) ## 切り出す際に随時入れられた後、同時に稼働中のエンコーダーに投入するために取り出される ## None が入れられたらこれ以上データは入らないことを示す - ## TS パケットは最大 256 個までに制限されている (188 * 256 = 48128B) + ## TS パケットは最大 10000 個までに制限されている (188 * 10000 = 1880000 バイト = 1.88 MB) ## Queue に TS パケットが溜まりすぎてもエンコーダーが追いつかないので、256 個以上溜まったら一旦読み取りを中断してエンコーダーに投入されるようにする segment_ts_packet_queue: queue.Queue[bytes | None] @@ -66,21 +66,21 @@ class VideoStreamSegment: ## 一度 True になるとリセットされない限り False には戻らない is_started: bool = False - # HLS セグメントのエンコードが完了したかどうか - ## エンコードが完了したら True になる - is_encode_completed: bool = False + # HLS セグメントのエンコードの状態 + encode_status: Literal['Pending', 'Encoding', 'Completed'] = 'Pending' - def resetState(self) -> None: + async def resetState(self) -> None: """ このセグメントの状態をリセットする - 主にエンコード中にエンコードタスクがキャンセルされた場合に呼び出される + 主にエンコード中にエンコードタスクがキャンセルされたか、セグメントのエンコードが失敗した場合に呼び出される + asyncio.Future を初期化するにはイベントループ上でなければならないらしいので、このメソッドを非同期関数にしている """ - self.segment_ts_packet_queue = queue.Queue(maxsize=256) + self.segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000) self.encoded_segment_ts_future = asyncio.Future() self.is_started = False - self.is_encode_completed = False + self.encode_status = 'Pending' class VideoStream: @@ -233,7 +233,7 @@ async def getVirtualPlaylist(self) -> str: end_pts = 0, # 仮の値 end_file_position = 0, # 仮の値 duration_seconds = 0, # 仮の値 - segment_ts_packet_queue = queue.Queue(maxsize=256), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する + segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する encoded_segment_ts_future = asyncio.Future(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する )) is_first_keyframe_found = True @@ -261,7 +261,7 @@ async def getVirtualPlaylist(self) -> str: end_pts = 0, # 仮の値 end_file_position = 0, # 仮の値 duration_seconds = 0, # 仮の値 - segment_ts_packet_queue = queue.Queue(maxsize=256), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する + segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する encoded_segment_ts_future = asyncio.Future(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する )) segment_sequence += 1 @@ -322,14 +322,14 @@ async def getSegment(self, segment_sequence: int) -> bytes | None: asyncio.create_task(self._encoding_task.run(segment_sequence)) Logging.info(f'[Video: {self.video_stream_id}][Segment {segment_sequence}] New Encoding Task Started.') - # エンコードタスクは既に起動しているがこの時点でまだセグメントのエンコードが完了していなければ、このセグメントからエンコードタスクを非同期で開始する + # エンコードタスクは既に起動しているがこの時点でまだセグメントのエンコードが開始されていなければ、このセグメントからエンコードタスクを非同期で開始する # この HLS セグメントのエンコード処理が現在進行中の場合は完了まで待つ - elif segment.encoded_segment_ts_future.done() is False and segment.is_encode_completed is False: + elif segment.encoded_segment_ts_future.done() is False and segment.encode_status == 'Pending': # 0.5 秒待ってみて、それでも同じ状態のときだけエンコードタスクを再起動する # タイミングの関係であともう少しだけ待てば当該セグメントのエンコードが開始するのに…!という場合に備える await asyncio.sleep(0.5) - if segment.encoded_segment_ts_future.done() is False and segment.is_encode_completed is False: + if segment.encoded_segment_ts_future.done() is False and segment.encode_status == 'Pending': # 以前のエンコードタスクをキャンセルする # この時点で以前のエンコードタスクでエンコードが完了していたセグメントに関してはそのまま self.segments に格納されている