Skip to content

Commit

Permalink
Fix: [Server][VideoEncodingTask] 様々な不具合修正
Browse files Browse the repository at this point in the history
  • Loading branch information
tsukumijima committed Nov 19, 2023
1 parent bc3c116 commit 1adffef
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 48 deletions.
105 changes: 58 additions & 47 deletions server/app/streams/VideoEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,13 @@ def buildFFmpegOptions(self,
def buildHWEncCOptions(self,
quality: QUALITY_TYPES,
encoder_type: Literal['QSVEncC', 'NVEncC', 'VCEEncC', 'rkmppenc'],
output_timestamp_offset: float,
) -> list[str]:
"""
QSVEncC・NVEncC・VCEEncC・rkmppenc (便宜上 HWEncC と総称) に渡すオプションを組み立てる
Args:
quality (QUALITY_TYPES): 映像の品質
encoder_type (Literal['QSVEncC', 'NVEncC', 'VCEEncC', 'rkmppenc']): エンコーダー (QSVEncC or NVEncC or VCEEncC or rkmppenc)
output_timestamp_offset (float): セグメントを出力する際のタイムスタンプ類のオフセット (DTS)
Returns:
list[str]: HWEncC に渡すオプションが連なる配列
Expand Down Expand Up @@ -285,9 +283,8 @@ def buildHWEncCOptions(self,
video_width = 1920
options.append(f'--output-res {video_width}x{video_height}')

# 出力するセグメント TS のタイムスタンプのオフセット
# タイムスタンプが前回エンコードしたセグメントの続きになるようにする
options.append(f'-m output_ts_offset:{output_timestamp_offset}')
# 入力のタイムスタンプを出力にコピーする
options.append(f'--timestamp-passthrough')

# 出力
options.append('--output-format mpegts') # MPEG-TS 出力ということを明示
Expand Down Expand Up @@ -317,9 +314,11 @@ async def __runEncoder(self, segment: VideoStreamSegment) -> bool:
bool: 最終的にエンコードが成功したかどうか
"""

# エンコーダーの種類を取得
ENCODER_TYPE = Config().general.encoder

# まだエンコーダーが起動している場合はエラー
assert self._tsreadex_process is None, 'tsreadex process is already running.'
assert self._encoder_process is None, 'encoder process is already running.'
assert self._tsreadex_process is None or self._encoder_process is None, f'{ENCODER_TYPE} process is already running.'

# 処理対象の VideoStreamSegment をエンコード中に設定
segment.is_encode_processing = True
Expand Down Expand Up @@ -376,12 +375,8 @@ async def __runEncoder(self, segment: VideoStreamSegment) -> bool:

# ***** エンコーダープロセスの作成と実行 *****

# エンコーダーの種類を取得
encoder_type = Config().general.encoder
# encoder_type = 'FFmpeg' # デバッグ用

# FFmpeg
if encoder_type == 'FFmpeg':
if ENCODER_TYPE == 'FFmpeg':

# オプションを取得
encoder_options = self.buildFFmpegOptions(self.video_stream.quality, segment.start_dts_second)
Expand All @@ -400,12 +395,12 @@ async def __runEncoder(self, segment: VideoStreamSegment) -> bool:
else:

# オプションを取得
encoder_options = self.buildHWEncCOptions(self.video_stream.quality, encoder_type, segment.start_dts_second)
Logging.info(f'[Video: {self.video_stream.video_stream_id}] {encoder_type} Commands:\n{encoder_type} {" ".join(encoder_options)}')
encoder_options = self.buildHWEncCOptions(self.video_stream.quality, ENCODER_TYPE)
Logging.info(f'[Video: {self.video_stream.video_stream_id}] {ENCODER_TYPE} Commands:\n{ENCODER_TYPE} {" ".join(encoder_options)}')

# エンコーダープロセスを非同期で作成・実行
self._encoder_process = await asyncio.subprocess.create_subprocess_exec(
*[LIBRARY_PATH[encoder_type], *encoder_options],
*[LIBRARY_PATH[ENCODER_TYPE], *encoder_options],
stdin = tsreadex_read_pipe, # tsreadex からの入力
stdout = asyncio.subprocess.PIPE, # ストリーム出力
stderr = asyncio.subprocess.DEVNULL,
Expand All @@ -417,8 +412,7 @@ async def __runEncoder(self, segment: VideoStreamSegment) -> bool:

# ***** エンコーダーの出力の読み取り *****

assert self._tsreadex_process.returncode is None, f'tsreadex exited with exit code {self._tsreadex_process.returncode}.'
assert self._encoder_process.returncode is None, f'{encoder_type} exited with exit code {self._encoder_process.returncode}.'
assert self._encoder_process.returncode is None, f'{ENCODER_TYPE} exited with exit code {self._encoder_process.returncode}.'

# エンコーダーが完了するまで待機し、エンコード済みのセグメントデータを取得する
assert self._encoder_process.stdout is not None
Expand All @@ -435,7 +429,7 @@ async def __runEncoder(self, segment: VideoStreamSegment) -> bool:
if self._encoder_process.returncode != 0 and self._encoder_process.returncode is not None:
segment.is_encode_processing = False # エンコードが失敗したのでエンコード中フラグを下ろす
self.__terminateEncoder()
Logging.error(f'[Video: {self.video_stream.video_stream_id}] {encoder_type} exited with non-zero exit code.')
Logging.error(f'[Video: {self.video_stream.video_stream_id}] {ENCODER_TYPE} exited with non-zero exit code.')
return False

# この時点で tsreadex とエンコーダーは終了しているはずだが、念のため強制終了しておく
Expand All @@ -462,14 +456,19 @@ async def __pushTSPacketDataToEncoder(self, packet: bytes | None) -> None:
packet (bytes): MPEG2-TS パケット or これ以上エンコーダーに投入するパケットがない場合は None
"""

# エンコーダーの種類を取得
ENCODER_TYPE = Config().general.encoder

# すでにキャンセルされている場合は何もしない
# キャンセルされている場合当然ながらエンコーダープロセスも起動していない状態になるので、下記の assert より上に書いている
if self._is_cancelled is True:
return

# エンコーダープロセスが起動していない場合はエラー
assert self._tsreadex_process is not None, 'tsreadex process is not running.'
assert self._encoder_process is not None, 'encoder process is not running.'
# エンコーダープロセスが起動していない場合はエラーを出力した上で何もしない
## 基本発生しないはずだし発生してはならない
if self._tsreadex_process is None or self._encoder_process is None:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] {ENCODER_TYPE} process is not running. TS packet is discarded.')
return

# これ以上エンコーダーに投入するパケットがない場合は tsreadex の標準入力を閉じる
assert self._tsreadex_process.stdin is not None
Expand All @@ -491,6 +490,9 @@ def __terminateEncoder(self) -> None:
起動中のエンコーダープロセスを強制終了する
"""

# エンコーダーの種類を取得
ENCODER_TYPE = Config().general.encoder

# tsreadex プロセスを強制終了する
if self._tsreadex_process is not None:
try:
Expand All @@ -506,6 +508,7 @@ def __terminateEncoder(self) -> None:
except Exception:
pass
self._encoder_process = None
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}] Terminated {ENCODER_TYPE} process.')


async def run(self, first_segment_index: int) -> None:
Expand All @@ -531,6 +534,9 @@ async def run(self, first_segment_index: int) -> None:

Logging.info(f'[Video: {self.video_stream.video_stream_id}] VideoEncodingTask started.')

# エンコーダーの種類を取得
ENCODER_TYPE = Config().general.encoder

# 視聴対象の録画番組が放送されたチャンネルのサービス ID
SERVICE_ID: int | None = self.recorded_program.channel.service_id if self.recorded_program.channel is not None else None

Expand Down Expand Up @@ -571,9 +577,6 @@ async def run(self, first_segment_index: int) -> None:
if len(packet) != ts.PACKET_SIZE:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Packet size is not 188 bytes.')
continue
if packet[0] != 0x47:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Sync byte is not 0x47.')
continue

# TS パケットの PID を取得する
PID = ts.pid(packet)
Expand Down Expand Up @@ -634,7 +637,7 @@ async def run(self, first_segment_index: int) -> None:
)
assert BYTE_RATE is not None
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}] '
f'Approximate Bitrate: {(BYTE_RATE / 1024 / 1024 * 8):.3f} Mbps')
f'Approximate Bitrate: {(BYTE_RATE / 1024 / 1024 * 8):.3f} Mbps')

# 最初の PCR 値を取得してから読み取った TS パケットの累計バイト数を更新する
# 最初の PCR 値が取得されるまではカウントしない
Expand Down Expand Up @@ -705,9 +708,6 @@ async def run(self, first_segment_index: int) -> None:
if len(packet) != ts.PACKET_SIZE:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Packet size is not 188 bytes.')
continue
if packet[0] != 0x47 and is_seeking is False: # シーク中は同期バイトをチェックしない
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Sync byte is not 0x47.')
continue

# TS パケットの PID を取得する
PID = ts.pid(packet)
Expand Down Expand Up @@ -752,8 +752,10 @@ async def run(self, first_segment_index: int) -> None:
current_dts = cast(int, mpeg2_or_h264_or_h265.dts() if mpeg2_or_h264_or_h265.has_dts() else mpeg2_or_h264_or_h265.pts())

# 現在の映像パケットの DTS (秒換算) を算出する
## 事前の概算ビットレートによるシークでの位置次第では FIRST_KEYFRAME_DTS_SECOND よりも小さくなることがある点に注意
## 例えば 20 秒時点のセグメントから開始する場合、概算ビットレートによるシークでは安全領域として 30 秒前から読み取りを開始するため、
## 必然的に max(0, 20 - 30) でファイル先頭 (0) から 30 秒分のデータを読み飛ばすことになる
current_dts_second = current_dts / ts.HZ
assert (segment_index == -1 or current_dts_second - FIRST_KEYFRAME_DTS_SECOND >= 0), 'Current Relative Time is negative.'

# 次のセグメントを取得 (最後のセグメントの場合は None)
next_segment = self.video_stream.segments[segment_index + 1] if segment_index + 1 < len(self.video_stream.segments) else None
Expand All @@ -766,28 +768,28 @@ async def run(self, first_segment_index: int) -> None:
# ここを通った時点で確実に first_segment_index のセグメントまでのシークが完了している
is_seeking = False
Logging.info(f'[Video: {self.video_stream.video_stream_id}] '
f'Seeked to {current_dts_second - FIRST_KEYFRAME_DTS_SECOND} seconds.')
f'Seeked to {current_dts_second - FIRST_KEYFRAME_DTS_SECOND} seconds.')
else:
# ここを通った時点で前のセグメントの切り出しが完了している
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Cut out {segment_bytes_count / 1024 / 1024:.3f} MiB.')
f'Cut out {segment_bytes_count / 1024 / 1024:.3f} MiB.')

# まだ前のセグメントのエンコーダーが起動している場合
if segment_encoding_task is not None and segment_encoding_task.done() is False:

# もう投入するパケットがないことを通知する (これで tsreadex の標準入力が閉じられ、エンコードが完了する)
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
'Waiting for encoder to finish...')
f'Waiting for {ENCODER_TYPE} to finish...')
await self.__pushTSPacketDataToEncoder(None)

# エンコードが完了するまで待機する
result = await segment_encoding_task
if result is True:
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
'Successfully encoded HLS segment.')
'Successfully encoded HLS segment.')
else:
Logging.error(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
'Failed to encode HLS segment.')
'Failed to encode HLS segment.')

# この時点で次のセグメントのエンコードがすでに完了している場合、これ以上エンコードタスクを動かす必要はないのでループを抜ける
## 例えば 15 分地点からエンコードタスクが開始されて 30 分地点までエンコードした後、
Expand All @@ -803,27 +805,36 @@ async def run(self, first_segment_index: int) -> None:
segment = self.video_stream.segments[segment_index]
segment_bytes_count = 0 # バイト数カウントをリセットする
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Current Relative Time: {current_dts_second - FIRST_KEYFRAME_DTS_SECOND}')
f'Current Relative Time: {current_dts_second - FIRST_KEYFRAME_DTS_SECOND}')
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Current DTS (Seconds): {current_dts_second}')
f'Current DTS (Seconds): {current_dts_second}')
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Segment Start DTS (Seconds): {segment.start_dts_second}')
f'Segment Start DTS (Seconds): {segment.start_dts_second}')
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Start: {segment.start_dts_second - FIRST_KEYFRAME_DTS_SECOND} / '
f'End: {(segment.start_dts_second - FIRST_KEYFRAME_DTS_SECOND) + segment.duration}')
f'Start: {segment.start_dts_second - FIRST_KEYFRAME_DTS_SECOND} / '
f'End: {(segment.start_dts_second - FIRST_KEYFRAME_DTS_SECOND) + segment.duration}')
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
'Encoding HLS segment...')
'Encoding HLS segment...')

# 非同期でエンコーダーを起動する
segment_encoding_task = asyncio.create_task(self.__runEncoder(segment))

# エンコーダープロセスが完全に起動するのを待機する
# ここでしっかり待機しないと微妙なタイミングの差で self.__pushTSPacketDataToEncoder() で書き込みエラーが発生することがある
while self._tsreadex_process is None or self._encoder_process is None:
await asyncio.sleep(0.01)

# シーク中でなければ、現在処理中のセグメントのデータをエンコーダーに投入する
# シーク中でなければ、現在処理中のセグメントのデータをリアルタイムにエンコーダーに投入する
if is_seeking is False:

# エンコーダープロセスが完全に起動するのを待機する
if self._tsreadex_process is None or self._encoder_process is None:
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] '
f'Waiting for {ENCODER_TYPE} to start...')
while (self._tsreadex_process is None or self._encoder_process is None) and self._is_cancelled is False:
await asyncio.sleep(0.01)
if self._is_cancelled is False:
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] {ENCODER_TYPE} started.')
else:
# エンコードタスクがキャンセルされた場合は処理を中断する
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] {ENCODER_TYPE} cancelled.')

# エンコーダーに現在処理中の TS パケットを投入する
await self.__pushTSPacketDataToEncoder(packet)
segment_bytes_count += ts.PACKET_SIZE # バイト数カウントを更新する

Expand All @@ -846,7 +857,7 @@ async def run(self, first_segment_index: int) -> None:
if segment_encoding_task is not None and segment_encoding_task.done() is False:

# もう投入するパケットがないことを通知する (これで tsreadex の標準入力が閉じられ、エンコードが完了する)
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] Waiting for encoder to finish...')
Logging.info(f'[Video: {self.video_stream.video_stream_id}][Segment {segment_index}] Waiting for {ENCODER_TYPE} to finish...')
await self.__pushTSPacketDataToEncoder(None)

# エンコードが完了するまで待機する
Expand Down
Loading

0 comments on commit 1adffef

Please sign in to comment.