Skip to content

Commit

Permalink
Fix: [Server][VideoEncodingTask] 偶然 sync_byte でない 0x47 が含まれていた場合に同期に失…
Browse files Browse the repository at this point in the history
…敗する不具合を修正
  • Loading branch information
tsukumijima committed Nov 24, 2023
1 parent 611211b commit d72d6b2
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 42 deletions.
4 changes: 2 additions & 2 deletions client/src/services/player/PlayerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ class PlayerController {
}, 60 * 1000);
}

// ビデオ視聴: ビデオストリームのアクティブ状態を維持するために 15 秒おきに Keep-Alive API にリクエストを送る
// ビデオ視聴: ビデオストリームのアクティブ状態を維持するために 5 秒おきに Keep-Alive API にリクエストを送る
// HLS プレイリストやセグメントのリクエストが行われたタイミングでも Keep-Alive が行われるが、
// それだけではタイミング次第では十分ではないため、定期的に Keep-Alive を行う
// Keep-Alive が行われなくなったタイミングで、サーバー側で自動的にビデオストリームの終了処理 (エンコードタスクの停止) が行われる
Expand All @@ -773,7 +773,7 @@ class PlayerController {
if (this.player === null) return;
const api_quality = PlayerUtils.extractVideoAPIQualityFromDPlayer(this.player);
await APIClient.put(`${Utils.api_base_url}/streams/video/${player_store.recorded_program.id}/${api_quality}/keep-alive`);
}, 15 * 1000);
}, 5 * 1000);
}

// 再生/停止されたときのイベント
Expand Down
114 changes: 81 additions & 33 deletions server/app/streams/VideoEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import asyncio
import io
import os
import subprocess
import threading
Expand All @@ -28,6 +29,10 @@

class VideoEncodingTask:

# TS の sync_byte (int)
## ts.SYNC_BYTE は bytes なので、int 版の定数として定義する
SYNC_BYTE_INT: ClassVar[int] = 0x47

# エンコードする HLS セグメントの長さ (秒)
SEGMENT_DURATION_SECONDS: ClassVar[float] = float(10) # 10秒

Expand Down Expand Up @@ -335,6 +340,8 @@ def __runEncoder(self, segment: VideoStreamSegment) -> None:
ENCODER_TYPE = Config().general.encoder

# エンコーダーの多重起動を防止するためのロックを確保
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
'Waiting for the encoder lock...')
with self._encoder_lock:

# ロック確保後にエンコードタスクがキャンセルされた場合、処理を中断する
Expand Down Expand Up @@ -598,6 +605,42 @@ def __terminateEncoder(self) -> None:
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}] Terminated {ENCODER_TYPE} process.')


def __findTSPacketSyncByte(self, reader: io.BufferedReader) -> bool:
"""
指定された TS ファイルを sync_byte の直前まで読み進める (written with GPT-4)
このメソッドを実行した後 reader.read() を実行すると最初のバイトが sync_byte になる
TS ファイルには sync_byte ではない 0x47 も含まれているため、正しい sync_byte であることを毎回確認する必要がある
Args:
reader (io.BufferedReader): 読み込み対象の TS ファイル
Returns:
bool: 正しい sync_byte が見つかったかどうか
"""

while True:
sync_byte = reader.read(1)
if not sync_byte:
return False # End of file
if sync_byte[0] == VideoEncodingTask.SYNC_BYTE_INT:
# 次の 188 バイトも 0x47 で始まるかどうかを確認する
reader.seek(188 - 1, os.SEEK_CUR) # 次の 187 バイトを読み飛ばす
next_sync_byte = reader.read(1)
if not next_sync_byte:
return False # End of file
if next_sync_byte[0] == VideoEncodingTask.SYNC_BYTE_INT:
# 最初の sync_byte の "前" の位置に戻る
## これにより、reader.read() を実行すると最初のバイトが sync_byte になる
reader.seek(-1, os.SEEK_CUR) # 読み取った sync_byte 分を戻す
reader.seek(-188, os.SEEK_CUR) # TS パケット 1 つ分戻る
return True
else:
# 最初の sync_byte の "後" の位置に戻る
## ここに来ているということは最初に見つけた 0x47 が偶然の一致である可能性が高い
## 最初の sync_byte の直後から再度 sync_byte を探す
reader.seek(-188, os.SEEK_CUR) # TS パケット 1 つ分戻る


def __isPESPacketInSegment(self, pes_header: PES, is_video_stream: bool, segment: VideoStreamSegment) -> bool:
"""
PES パケットが指定されたセグメントの切り出し範囲に含まれるかどうかを判定する
Expand Down Expand Up @@ -675,23 +718,29 @@ def __run(self, first_segment_index: int) -> None:
latest_pcr_ts_packet_bytes: int | None = None # 最初の PCR 値を取得してから読み取った TS パケットの累計バイト数
pcr_remain_count: int = 30 # 30 回分の PCR 値を取得する (PCR を取得するたびに 1 減らす)
with open(self.recorded_video.file_path, 'rb') as reader:
while True:

# 同期バイトを探す
while True:
sync_byte: bytes = reader.read(1)
if sync_byte == ts.SYNC_BYTE:
break
elif sync_byte == b'':
# このループでファイルの終端に達することは基本ないはず
assert False, 'Invalid TS file. Sync byte is not found.'
# sync_byte を探す
is_sync_byte_found = self.__findTSPacketSyncByte(reader)
assert is_sync_byte_found is True, 'Failed to find sync byte.'

while True:

# 速度向上のため 188 * 10000 バイトのチャンクで一気に読み込んだ後、188 バイトごとの TS パケットに分割して処理する
chunk = ts.SYNC_BYTE + reader.read((ts.PACKET_SIZE * 10000) - 1)
for ts_packet in [chunk[i:i + ts.PACKET_SIZE] for i in range(0, len(chunk), ts.PACKET_SIZE)]:
if len(ts_packet) != ts.PACKET_SIZE:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Packet size is not 188 bytes.')
continue
# ファイルの終端に到達したら (read() してもデータが取れなくなったら) ループを抜ける
chunk = reader.read(ts.PACKET_SIZE * 10000)
if chunk == b'':
break

# 取得したチャンクを TS パケットごとに分割する
## 必ずしも 188 * 10000 バイト取得しているとは限らないが、188 バイトの倍数にはなっているはず
assert chunk[0] == VideoEncodingTask.SYNC_BYTE_INT, f'Invalid TS packet. sync_byte is not found. (0x{chunk[0]:02x})'
assert len(chunk) % ts.PACKET_SIZE == 0
ts_packets = [chunk[i:i + ts.PACKET_SIZE] for i in range(0, len(chunk), ts.PACKET_SIZE)]

# 各 TS パケットを処理する
for ts_packet in ts_packets:
assert len(ts_packet) == ts.PACKET_SIZE, f'Packet size is not 188 bytes. ({len(ts_packet)} bytes)'
assert ts_packet[0] == VideoEncodingTask.SYNC_BYTE_INT, f'Invalid TS packet. sync_byte is not found. (0x{ts_packet[0]:02x})'

# TS パケットの PID を取得する
PID = ts.pid(ts_packet)
Expand Down Expand Up @@ -848,29 +897,28 @@ def __run(self, first_segment_index: int) -> None:
encoder_thread = threading.Thread(target=self.__runEncoder, args=(first_segment,))
encoder_thread.start()

while True:
# sync_byte を探す
is_sync_byte_found = self.__findTSPacketSyncByte(reader)
assert is_sync_byte_found is True, 'Failed to find sync byte.'

# 同期バイトを探す
isEOF = False
while True:
sync_byte: bytes = reader.read(1)
if sync_byte == ts.SYNC_BYTE:
break
elif sync_byte == b'':
# ファイルの終端に達した場合はループを抜けて終了する
isEOF = True
break
while True:

# ファイルの終端に達した場合はループを抜けて終了する
if isEOF is True:
# 速度向上のため 188 * 10000 バイトのチャンクで一気に読み込んだ後、188 バイトごとの TS パケットに分割して処理する
# ファイルの終端に到達したら (read() してもデータが取れなくなったら) ループを抜ける
chunk = reader.read(ts.PACKET_SIZE * 10000)
if chunk == b'':
break

# 速度向上のため 188 * 10000 バイトのチャンクで一気に読み込んだ後、188 バイトごとの TS パケットに分割して処理する
chunk = ts.SYNC_BYTE + reader.read((ts.PACKET_SIZE * 10000) - 1)
for ts_packet in [chunk[i:i + ts.PACKET_SIZE] for i in range(0, len(chunk), ts.PACKET_SIZE)]:
if len(ts_packet) != ts.PACKET_SIZE:
Logging.error(f'[Video: {self.video_stream.video_stream_id}] Packet size is not 188 bytes.')
continue
# 取得したチャンクを TS パケットごとに分割する
## 必ずしも 188 * 10000 バイト取得しているとは限らないが、188 バイトの倍数にはなっているはず
assert chunk[0] == VideoEncodingTask.SYNC_BYTE_INT, f'Invalid TS packet. sync_byte is not found. (0x{chunk[0]:02x})'
assert len(chunk) % ts.PACKET_SIZE == 0
ts_packets = [chunk[i:i + ts.PACKET_SIZE] for i in range(0, len(chunk), ts.PACKET_SIZE)]

# 各 TS パケットを処理する
for ts_packet in ts_packets:
assert len(ts_packet) == ts.PACKET_SIZE, f'Packet size is not 188 bytes. ({len(ts_packet)} bytes)'
assert ts_packet[0] == VideoEncodingTask.SYNC_BYTE_INT, f'Invalid TS packet. sync_byte is not found. (0x{ts_packet[0]:02x})'

# TS パケットの PID を取得する
PID = ts.pid(ts_packet)
Expand Down
12 changes: 5 additions & 7 deletions server/app/streams/VideoStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class VideoStreamSegment:
# 上記の情報に基づいて切り出された HLS セグメントの MPEG-TS データを入れる Queue (エンコーダーが同期関数なので同期用の Queue にしている)
## 切り出す際に随時入れられた後、同時に稼働中のエンコーダーに投入するために取り出される
## None が入れられたらこれ以上データは入らないことを示す
## TS パケットは最大 10000 個までに制限されている (188 * 10000 = 1880000 バイト = 1.88 MB)
## Queue に TS パケットが溜まりすぎてもエンコーダーが追いつかないので、256 個以上溜まったら一旦読み取りを中断してエンコーダーに投入されるようにする
segment_ts_packet_queue: queue.Queue[bytes | None]

# HLS セグメントのエンコード済み MPEG-TS データが返る asyncio.Future
Expand All @@ -80,7 +78,7 @@ async def resetState(self) -> None:
asyncio.Future を初期化するにはイベントループ上でなければならないらしいので、このメソッドを非同期関数にしている
"""

self.segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000)
self.segment_ts_packet_queue = queue.Queue()
self.encoded_segment_ts_future = asyncio.Future()
self.is_started = False
self.encode_status = 'Pending'
Expand All @@ -89,9 +87,9 @@ async def resetState(self) -> None:
class VideoStream:
""" ビデオストリームを管理するクラス """

# ビデオストリームの操作がなかった場合にタイムアウトするまでの時間 (秒)
# ビデオストリームが再生されていない場合にタイムアウトするまでの時間 (秒)
# この時間が経過すると、ビデオストリームのインスタンスは自動的に破棄される
VIDEO_STREAM_TIMEOUT: ClassVar[float] = float(30)
VIDEO_STREAM_TIMEOUT: ClassVar[float] = float(10) # 10 秒

# ビデオストリームのインスタンスが入る、ビデオストリーム ID をキーとした辞書
# この辞書にビデオストリームに関する全てのデータが格納されている
Expand Down Expand Up @@ -239,7 +237,7 @@ async def getVirtualPlaylist(self) -> str:
end_file_position = 0, # 仮の値
duration_seconds = 0, # 仮の値
frame_count = 1,
segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
segment_ts_packet_queue = queue.Queue(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
encoded_segment_ts_future = asyncio.Future(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
))
is_first_keyframe_found = True
Expand Down Expand Up @@ -271,7 +269,7 @@ async def getVirtualPlaylist(self) -> str:
end_file_position = 0, # 仮の値
duration_seconds = 0, # 仮の値
frame_count = 1,
segment_ts_packet_queue = queue.Queue(maxsize=188 * 10000), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
segment_ts_packet_queue = queue.Queue(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
encoded_segment_ts_future = asyncio.Future(), # dataclass 側に書くと全ての参照が同じになってしまうので毎回新たに生成する
))
segment_sequence += 1
Expand Down

0 comments on commit d72d6b2

Please sign in to comment.