Skip to content

Commit

Permalink
Merge pull request #44 from xtne6f/no-use-create-pipe-connection
Browse files Browse the repository at this point in the history
Update: [Server][EDCBTuner] ProactorEventLoop.create_pipe_connection() の使用をやめる
  • Loading branch information
tsukumijima authored May 30, 2023
2 parents 7b12657 + c980d3d commit aeb354a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
6 changes: 3 additions & 3 deletions server/app/streams/LiveEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from app.models import Channel
from app.models import LiveStream
from app.utils import Logging
from app.utils.EDCB import EDCBTuner
from app.utils.EDCB import EDCBTuner, PipeStreamReader


class LiveEncodingTask:
Expand Down Expand Up @@ -529,7 +529,7 @@ async def run(self) -> None:
is_running: bool = True

# 放送波の MPEG2-TS を受信する StreamReader
stream_reader: asyncio.StreamReader | aiohttp.StreamReader
stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader

# EDCB のチューナーインスタンス (Mirakurun バックエンド利用時は常に None)
tuner: EDCBTuner | None = None
Expand Down Expand Up @@ -672,7 +672,7 @@ async def Reader():

# 受信した放送波が入るイテレータを作成
# R/W バッファ: 188B (TS Packet Size) * 256 = 48128B
async def GetIterator(stream_reader: asyncio.StreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]:
async def GetIterator(stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]:
while True:
try:
yield await stream_reader.readexactly(chunk_size)
Expand Down
59 changes: 38 additions & 21 deletions server/app/utils/EDCB.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,35 @@
import sys
import time
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, cast, ClassVar

from app.constants import CONFIG


class PipeStreamReader:
"""
パイプのファイルオブジェクトを非同期で読み込むクラス
ProactorEventLoop のパイプサポートは未だ不十分で、ドキュメントされていない create_pipe_connection メソッドも
内部で Win32API の CreateFile に渡すフラグが不適切で使い物にならないためつなぎとして用意したもの
"""

def __init__(self, pipe: Any, executor: ThreadPoolExecutor, loop: Any):
self.__pipe = pipe
self.__executor = executor
self.__loop = loop
self.__buffer = bytearray()

async def readexactly(self, n: int) -> bytes:
self.__buffer.clear()
while len(self.__buffer) < n:
data = await self.__loop.run_in_executor(self.__executor, lambda: self.__pipe.read(n - len(self.__buffer)))
if len(data) == 0:
raise asyncio.IncompleteReadError(bytes(self.__buffer), None)
self.__buffer += data
return bytes(self.__buffer)


class EDCBTuner:
""" EDCB バックエンドのチューナーを制御するクラス """

Expand Down Expand Up @@ -173,34 +197,37 @@ async def open(self) -> bool:
return True


async def connect(self) -> asyncio.StreamReader | None:
async def connect(self) -> asyncio.StreamReader | PipeStreamReader | None:
"""
チューナーに接続し、放送波を受け取るための TCP ソケットまたは名前付きパイプを返す
Returns:
asyncio.StreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す)
asyncio.StreamReader | PipeStreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す)
"""

# プロセス ID が取得できている(チューナーが起動している)ことが前提
if self._edcb_process_id is None:
return None

stream_reader: asyncio.StreamReader | PipeStreamReader | None = None

# チューナーに接続する
if EDCBUtil.getEDCBHost() != 'edcb-namedpipe':
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプの出力を、
## EpgTimerSrv の CtrlCmd インターフェイス (TCP API) 経由で受信するための TCP ソケット (StreamReader / StreamWriter)
result = await EDCBUtil.openViewStream(self._edcb_process_id)
stream_reader, stream_writer = (None, None) if result is None else result
else:
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (StreamReader / StreamWriter)
result = await EDCBUtil.openPipeStream(self._edcb_process_id)
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (PipeStreamReader)
stream_reader = await EDCBUtil.openPipeStream(self._edcb_process_id)
stream_writer = None

# チューナーへの接続に失敗した
## チューナーを閉じてからエラーを返す
if result is None:
if stream_reader is None:
await self.close() # チューナーを閉じる
return None

stream_reader, stream_writer = result
self._edcb_stream_writer = stream_writer

return stream_reader
Expand Down Expand Up @@ -402,8 +429,8 @@ async def openViewStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as
return None

@staticmethod
async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[asyncio.StreamReader, asyncio.StreamWriter] | None:
""" システムに存在する SrvPipe ストリームを開き、StreamReader / StreamWriter を返す """
async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> PipeStreamReader | None:
""" システムに存在する SrvPipe ストリームを開く """
if sys.platform != 'win32':
raise NotImplementedError('Windows Only')

Expand All @@ -413,21 +440,11 @@ async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as
while time.monotonic() < to:
# ポートは必ず 0 から 29 まで
for port in range(30):
# asyncio.ProactorEventLoop.create_pipe_connection() を使う (Windows 専用のプライベート API)
# ref: https://github.com/qwertyquerty/pypresence/blob/4.2.1/pypresence/baseclient.py#L105-L123
path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id)
reader = asyncio.StreamReader(loop=loop)
reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
try:
transport, _ = await cast(asyncio.ProactorEventLoop, loop).create_pipe_connection(lambda: reader_protocol, path)
writer = asyncio.StreamWriter(transport, reader_protocol, reader, loop)
return (reader, writer)
path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id)
pipe = open(path, mode = 'rb')
return PipeStreamReader(pipe, ThreadPoolExecutor(), loop)
except:
# TODO: エラーを解消できたら削除
import traceback
from app.utils import Logging
Logging.error('openPipeStream: failed to connect to ' + path)
Logging.error(traceback.format_exc())
pass
await asyncio.sleep(wait)
# 初期に成功しなければ見込みは薄いので問い合わせを疎にしていく
Expand Down

0 comments on commit aeb354a

Please sign in to comment.