Skip to content

Commit

Permalink
Fix: [Server][VideoEncodingTask] 細かな不具合の修正
Browse files Browse the repository at this point in the history
  • Loading branch information
tsukumijima committed Nov 22, 2023
1 parent ae8a15e commit 3002ff7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
40 changes: 32 additions & 8 deletions server/app/streams/VideoEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import subprocess
import threading
import time
from biim.mpeg2ts import ts
from biim.mpeg2ts.packetize import packetize_section
from biim.mpeg2ts.pat import PATSection
Expand Down Expand Up @@ -331,6 +332,11 @@ def __runEncoder(self, segment: VideoStreamSegment) -> None:
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
f'Waiting previous {ENCODER_TYPE} to finish...')
self._encoder_process.wait()
time.sleep(0.1) # ちょっと待つ

# 待機後にエンコードタスクがキャンセルされた場合、処理を中断する
if self._is_cancelled is True:
return # メソッドの実行自体を終了する

Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] Encoding HLS segment...')

Expand Down Expand Up @@ -515,20 +521,27 @@ def Reader() -> None:
reader_thread.join()

# この時点でエンコードタスクがキャンセルされていればエンコード済みのセグメントデータを放棄して中断する
if self._is_cancelled is True:
## この時点でエンコーダープロセスが None になっている場合もキャンセルされたと判断する
if self._is_cancelled is True or self._encoder_process is None:
self.__terminateEncoder()
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
'Discarded encoded segment data because cancelled.')

# エンコード作業自体を中断したので、このセグメントの状態をリセットする
## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する
asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop)

# エンコードタスクの完了フラグを立てる
## 完了フラグを立てたタイミングで cancel() が戻る
self._is_finished = True
return

# この時点でエンコーダーの exit code が None (まだプロセスが起動している) か 0 でなければ何らかの理由でエンコードに失敗している
if self._encoder_process.poll() is not None and self._encoder_process.poll() != 0:
self.__terminateEncoder()
Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
f'{ENCODER_TYPE} exited with exit code {self._encoder_process.poll()}.')

# おそらく復旧しようがないが、一応このセグメントの状態をリセットする
## resetState() は asyncio.Future() を作り直す関係で非同期なので、メインスレッドに移譲して実行する
asyncio.run_coroutine_threadsafe(segment.resetState(), self._loop)
Expand Down Expand Up @@ -940,12 +953,12 @@ def __run(self, first_segment_index: int) -> None:
# PTS がレンジ内にあれば
if segment.start_pts <= current_pts <= segment.end_pts:

# この時点で前のセグメントのエンコードが終わっておらず、かつ現在の PTS が切り出し終了 PTS から 1 秒以上が経過している場合、
# この時点で前のセグメントのエンコードが終わっておらず、かつ現在の PTS が切り出し終了 PTS から 3 秒以上が経過している場合、
# もう前のセグメントに該当するパケットは降ってこないだろうと判断し、もう投入するパケットがないことをエンコーダーに通知する
## これで tsreadex の標準入力が閉じられ、エンコードが完了する
if (segment.sequence_index - 1 >= 0) and \
(self.video_stream.segments[segment.sequence_index - 1].encode_status != 'Completed') and \
(current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 1 * ts.HZ):
(current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 3 * ts.HZ):
self.video_stream.segments[segment.sequence_index - 1].segment_ts_packet_queue.put(None)

# もう投入するパケットがないことを通知したので、エンコーダーの終了を待つ (重要)
Expand All @@ -955,6 +968,10 @@ def __run(self, first_segment_index: int) -> None:
f' Waiting for {ENCODER_TYPE} process to exit...')
self._encoder_process.wait()

# 待機後にエンコードタスクがキャンセルされた場合、処理を中断してエンコードタスクを終了する
if self._is_cancelled is True:
return # メソッドの実行自体を終了する

# 当該セグメントのエンコードがすでに完了している場合は何もしない
## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、
## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる
Expand Down Expand Up @@ -1042,8 +1059,9 @@ def __run(self, first_segment_index: int) -> None:
# 途中でエンコードタスクがキャンセルされた場合、処理中のセグメントがあるかに関わらずエンコードタスクを終了する
# このとき、エンコーダーの出力はエンコードの完了を待つことなく破棄され、セグメントは処理開始前の状態にリセットされる
if self._is_cancelled is True:
return
return # メソッドの実行自体を終了する

# エンコードタスクでのすべての処理を完了した
self._is_finished = True
Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask finished.')

Expand All @@ -1060,7 +1078,7 @@ async def run(self, first_segment_index: int) -> None:
await asyncio.to_thread(self.__run, first_segment_index)


def cancel(self) -> None:
async def cancel(self) -> None:
"""
起動中のエンコードタスクをキャンセルし、起動中の外部プロセスを終了する
"""
Expand All @@ -1072,11 +1090,17 @@ def cancel(self) -> None:

if self._is_cancelled is False:

# エンコードタスクがキャンセルされたことを示すフラグを立てる
## この時点でまだ run() やエンコーダーが実行中であれば、run() やエンコーダーはこのフラグを見て自ら終了する
## できるだけ早い段階でフラグを立てておくことが重要
self._is_cancelled = True

# tsreadex とエンコーダーのプロセスを強制終了する
Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask cancelling...')
self.__terminateEncoder()

# エンコードタスクがキャンセルされたことを示すフラグを立てる
# この時点でまだ run() が実行中であれば、run() はこのフラグを見て自ら終了する
self._is_cancelled = True
# 完全に終了するまで待機する
while self._is_finished is False:
await asyncio.sleep(0.1)

Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask cancelled.')
10 changes: 5 additions & 5 deletions server/app/streams/VideoStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __new__(cls, recorded_program: RecordedProgram, quality: QUALITY_TYPES) -> V

# キャンセルされない限り VIDEO_STREAM_TIMEOUT 秒後にインスタンスを破棄するタイマー
# cancel_destroy_timer() を呼び出すことでタイマーをキャンセルできる
instance._cancel_destroy_timer = SetTimeout(lambda: instance.destroy(), cls.VIDEO_STREAM_TIMEOUT)
instance._cancel_destroy_timer = SetTimeout(lambda: asyncio.create_task(instance.destroy()), cls.VIDEO_STREAM_TIMEOUT)

# 生成したインスタンスを登録する
cls.__instances[video_stream_id] = instance
Expand Down Expand Up @@ -184,7 +184,7 @@ def keepAlive(self) -> None:
self._cancel_destroy_timer()

# キャンセルされない限り VIDEO_STREAM_TIMEOUT 秒後にインスタンスを破棄するタイマーを設定する
self._cancel_destroy_timer = SetTimeout(lambda: self.destroy(), self.VIDEO_STREAM_TIMEOUT)
self._cancel_destroy_timer = SetTimeout(lambda: asyncio.create_task(self.destroy()), self.VIDEO_STREAM_TIMEOUT)


async def getVirtualPlaylist(self) -> str:
Expand Down Expand Up @@ -333,7 +333,7 @@ async def getSegment(self, segment_sequence: int) -> bytes | None:

# 以前のエンコードタスクをキャンセルする
# この時点で以前のエンコードタスクでエンコードが完了していたセグメントに関してはそのまま self.segments に格納されている
self._encoding_task.cancel()
await self._encoding_task.cancel()
Logging.info(f'[Video: {self.video_stream_id}][Segment {segment_sequence}] Previous Encoding Task Canceled.')

# 新たにエンコードタスクを非同期で開始する
Expand All @@ -346,7 +346,7 @@ async def getSegment(self, segment_sequence: int) -> bytes | None:
return encoded_segment_ts


def destroy(self) -> None:
async def destroy(self) -> None:
"""
ビデオストリームで実行中のエンコードなどの処理を終了し、ビデオストリームを破棄する
ユーザーが番組の視聴を終了した (keepAlive() が呼び出されなくなった) 場合に自動的に呼び出される
Expand All @@ -355,7 +355,7 @@ def destroy(self) -> None:
# 起動中のエンコードタスクがあればキャンセルする
# この時点ですでにエンコードを完了して終了している場合もある
if self._encoding_task is not None:
self._encoding_task.cancel()
await self._encoding_task.cancel()
self._encoding_task = None

# すべての HLS セグメントを削除する
Expand Down

0 comments on commit 3002ff7

Please sign in to comment.