Skip to content

Commit

Permalink
Fix: [Server][VideoEncodingTask] 微妙にシークの高速化を図る
Browse files Browse the repository at this point in the history
初回シーク時に First PES packet is arrived. に到達するのが滅法遅いがなぜなんだ…
  • Loading branch information
tsukumijima committed Nov 24, 2023
1 parent 5acaffd commit b235b73
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions server/app/streams/VideoEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,10 @@ def __run(self, first_segment_index: int) -> None:
# 最後に取得した PID ごとの PES ヘッダー
latest_pes_headers: dict[int, PES] = {}

# インデックスが first_segment_index 以降のセグメントに絞った VideoStreamSegment のリスト
## 毎回呼び出すと遅いので高速化のために事前に作成しておく
filtered_segments = self.video_stream.segments[first_segment_index:]

# 現在処理中のセグメントのインデックス (HLS セグメントのシーケンス番号と一致する)
## self.video_stream.segments[monotonic_segment_index] が処理中のセグメントになる
## PTS がある PES パケットではこの値に関わらず PTS レンジに一致するセグメントに TS パケットが投入されるが、
Expand All @@ -826,9 +830,9 @@ def __run(self, first_segment_index: int) -> None:
with open(self.recorded_video.file_path, 'rb') as reader:
first_segment = self.video_stream.segments[first_segment_index]

# 余裕を持ってエンコードを開始する HLS セグメントのファイル上の位置 - 5 秒分の位置にシークする
# 余裕を持ってエンコードを開始する HLS セグメントのファイル上の位置 - 2 秒分の位置にシークする
## 正確にはシーク単位は 188 バイトずつでなければならないので 188 の倍数になるように調整する
seek_offset_bytes = ClosestMultiple(int(max(0, first_segment.start_file_position - (5 * BYTE_RATE))), ts.PACKET_SIZE)
seek_offset_bytes = ClosestMultiple(int(max(0, first_segment.start_file_position - (2 * BYTE_RATE))), ts.PACKET_SIZE)
reader.seek(seek_offset_bytes, os.SEEK_SET)
Logging.info(f'[Video: {self.video_stream.video_stream_id}] Seeked to {seek_offset_bytes} bytes.')

Expand Down Expand Up @@ -975,15 +979,15 @@ def __run(self, first_segment_index: int) -> None:
## TS は OpenGOP や送出タイミング (音声は映像より先行して送出されることが多い) の関係で
## 特定のファイル位置以前と以降の境目ではきれいに分割することができないため、
## 送出順 (符号化順) に関わらず PTS を基準に投入先のセグメントを振り分ける
for segment in self.video_stream.segments[first_segment_index:]:

# 現在の PES パケットの PTS を取得する
current_pts = pes_header.pts()
assert current_pts is not None
for segment in filtered_segments:

# 当該 PES が現在処理中のセグメントの切り出し範囲に含まれる
if self.__isPESPacketInSegment(pes_header, PID == VIDEO_PID, segment) is True:

# 現在の PES パケットの PTS を取得する
current_pts = pes_header.pts()
assert current_pts is not None

# 現在の PTS が前のセグメントの切り出し終了 PTS から 3 秒以上が経過している場合
if (segment.sequence_index - 1 >= first_segment_index) and \
(current_pts - self.video_stream.segments[segment.sequence_index - 1].end_pts >= 3 * ts.HZ):
Expand Down Expand Up @@ -1051,7 +1055,7 @@ def __run(self, first_segment_index: int) -> None:
# 当該セグメントの PTS レンジに一致する最初のパケットのみ
if segment.is_started is False:
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
'First TS packet is arrived.')
'First PES packet is arrived.')

# このタイミングで monotonic_segment_index が初期値の場合のみ、
# PTS が存在しないパケットがどのセグメントに TS パケットを投入すれば良いかのインデックスを切り替える
Expand All @@ -1076,7 +1080,7 @@ def __run(self, first_segment_index: int) -> None:
## インデックスは単調増加のため一度カウントアップしたらカウントが減ることはない
if PID == VIDEO_PID and segment.start_pts == current_pts and monotonic_segment_index < segment.sequence_index:
Logging.debug_simple(f'[Video: {self.video_stream.video_stream_id}][Segment {segment.sequence_index}] '
'First keyframe TS packet is arrived.')
'First keyframe PES packet is arrived.')
monotonic_segment_index = segment.sequence_index

# ここで Queue に投入したパケットがそのまま tsreadex → エンコーダーに投入される
Expand All @@ -1099,15 +1103,15 @@ def __run(self, first_segment_index: int) -> None:
## TS は OpenGOP や送出タイミング (音声は映像より先行して送出されることが多い) の関係で
## 特定のファイル位置以前と以降の境目ではきれいに分割することができないため、
## 送出順 (符号化順) に関わらず PTS を基準に投入先のセグメントを振り分ける
for segment in self.video_stream.segments[first_segment_index:]:

# 現在の PES パケットの PTS を取得する
current_pts = latest_pes_headers[PID].pts()
assert current_pts is not None
for segment in filtered_segments:

# 当該 PES が現在処理中のセグメントの切り出し範囲に含まれる
if self.__isPESPacketInSegment(latest_pes_headers[PID], PID == VIDEO_PID, segment) is True:

# 現在の PES パケットの PTS を取得する
current_pts = latest_pes_headers[PID].pts()
assert current_pts is not None

# 当該セグメントのエンコードがすでに完了している場合は何もしない
## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、
## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる
Expand All @@ -1130,7 +1134,7 @@ def __run(self, first_segment_index: int) -> None:
pcr_value = cast(int, ts.pcr(ts_packet))

# 開始 PTS 〜 終了 PTS のレンジに一致するセグメントが持つ Queue に TS パケットを投入する
for segment in self.video_stream.segments[first_segment_index:]:
for segment in filtered_segments:

# 当該 PCR が現在処理中のセグメントの切り出し範囲に含まれる
if segment.start_pts <= pcr_value <= segment.end_pts:
Expand All @@ -1150,14 +1154,12 @@ def __run(self, first_segment_index: int) -> None:
## PAT / PMT は別途投入済みなのでここには含まれない
else:

# monotonic_segment_index が正の値である (初期値でない) ことを確認する
if monotonic_segment_index >= 0:

# 当該セグメントのエンコードが完了していない場合のみ、TS パケットを投入する
## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、
## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる
if self.video_stream.segments[monotonic_segment_index].encode_status != 'Completed':
self.video_stream.segments[monotonic_segment_index].segment_ts_packet_queue.put(ts_packet)
# 事前に monotonic_segment_index が正の値である (初期値でない) ことを確認する
# 当該セグメントのエンコードが完了していない場合のみ、TS パケットを投入する
## 中間に数個だけ既にエンコードされているセグメントがあるケースでは、
## それらのエンコード完了済みセグメントの切り出し&エンコード処理をスキップして次のセグメントに進むことになる
if monotonic_segment_index >= 0 and self.video_stream.segments[monotonic_segment_index].encode_status != 'Completed':
self.video_stream.segments[monotonic_segment_index].segment_ts_packet_queue.put(ts_packet)

# 途中でエンコードタスクがキャンセルされた場合、処理中のセグメントがあるかに関わらずエンコードタスクを終了する
# このとき、エンコーダーの出力はエンコードの完了を待つことなく破棄され、セグメントは処理開始前の状態にリセットされる
Expand Down

0 comments on commit b235b73

Please sign in to comment.