-
-
Notifications
You must be signed in to change notification settings - Fork 31k
/
models.py
122 lines (94 loc) · 3.71 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Media Source models."""
from __future__ import annotations
from abc import ABC
from dataclasses import dataclass
from typing import Any, cast
from homeassistant.components.media_player import BrowseMedia, MediaClass, MediaType
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN, URI_SCHEME, URI_SCHEME_REGEX
@dataclass(slots=True)
class PlayMedia:
"""Represents a playable media."""
url: str
mime_type: str
class BrowseMediaSource(BrowseMedia):
"""Represent a browsable media file."""
def __init__(
self, *, domain: str | None, identifier: str | None, **kwargs: Any
) -> None:
"""Initialize media source browse media."""
media_content_id = f"{URI_SCHEME}{domain or ''}"
if identifier:
media_content_id += f"/{identifier}"
super().__init__(media_content_id=media_content_id, **kwargs)
self.domain = domain
self.identifier = identifier
@dataclass(slots=True)
class MediaSourceItem:
"""A parsed media item."""
hass: HomeAssistant
domain: str | None
identifier: str
target_media_player: str | None
async def async_browse(self) -> BrowseMediaSource:
"""Browse this item."""
if self.domain is None:
base = BrowseMediaSource(
domain=None,
identifier=None,
media_class=MediaClass.APP,
media_content_type=MediaType.APPS,
title="Media Sources",
can_play=False,
can_expand=True,
children_media_class=MediaClass.APP,
)
base.children = sorted(
(
BrowseMediaSource(
domain=source.domain,
identifier=None,
media_class=MediaClass.APP,
media_content_type=MediaType.APP,
thumbnail=f"https://brands.home-assistant.io/_/{source.domain}/logo.png",
title=source.name,
can_play=False,
can_expand=True,
)
for source in self.hass.data[DOMAIN].values()
),
key=lambda item: item.title,
)
return base
return await self.async_media_source().async_browse_media(self)
async def async_resolve(self) -> PlayMedia:
"""Resolve to playable item."""
return await self.async_media_source().async_resolve_media(self)
@callback
def async_media_source(self) -> MediaSource:
"""Return media source that owns this item."""
return cast(MediaSource, self.hass.data[DOMAIN][self.domain])
@classmethod
def from_uri(
cls, hass: HomeAssistant, uri: str, target_media_player: str | None
) -> MediaSourceItem:
"""Create an item from a uri."""
if not (match := URI_SCHEME_REGEX.match(uri)):
raise ValueError("Invalid media source URI")
domain = match.group("domain")
identifier = match.group("identifier")
return cls(hass, domain, identifier, target_media_player)
class MediaSource(ABC):
"""Represents a source of media files."""
name: str | None = None
def __init__(self, domain: str) -> None:
"""Initialize a media source."""
self.domain = domain
if not self.name:
self.name = domain
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve a media item to a playable item."""
raise NotImplementedError
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Browse media."""
raise NotImplementedError