Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate password before restoring backup #133647

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion homeassistant/components/backup/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
)
from .models import AgentBackup, Folder
from .store import BackupStore
from .util import make_backup_dir, read_backup
from .util import make_backup_dir, read_backup, validate_password


class IncorrectPasswordError(HomeAssistantError):
"""Raised when the password is incorrect."""


@dataclass(frozen=True, kw_only=True, slots=True)
Expand Down Expand Up @@ -1269,6 +1273,12 @@ async def async_restore_backup(

remove_after_restore = True

password_valid = await self._hass.async_add_executor_job(
validate_password, path, password
)
if not password_valid:
raise IncorrectPasswordError("The password provided is incorrect.")

def _write_restore_file() -> None:
"""Write the restore file."""
Path(self._hass.config.path(RESTORE_BACKUP_FILE)).write_text(
Expand Down
37 changes: 36 additions & 1 deletion homeassistant/components/backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from typing import cast

import aiohttp
from securetar import SecureTarFile

from homeassistant.backup_restore import password_to_key
from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonObjectType, json_loads_object

from .const import BUF_SIZE
from .const import BUF_SIZE, LOGGER
from .models import AddonInfo, AgentBackup, Folder


Expand Down Expand Up @@ -71,6 +73,39 @@ def read_backup(backup_path: Path) -> AgentBackup:
)


def validate_password(path: Path, password: str | None) -> bool:
"""Validate the password."""
with tarfile.open(path, "r:", bufsize=BUF_SIZE) as backup_file:
compressed = False
ha_tar_name = "homeassistant.tar"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
except KeyError:
compressed = True
ha_tar_name = "homeassistant.tar.gz"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
except KeyError:
LOGGER.error("No homeassistant.tar or homeassistant.tar.gz found")
return False
try:
with SecureTarFile(
path, # Not used
gzip=compressed,
key=password_to_key(password) if password is not None else None,
mode="r",
fileobj=ha_tar,
):
# If we can read the tar file, the password is correct
return True
except tarfile.ReadError:
LOGGER.debug("Invalid password")
return False
except Exception: # noqa: BLE001
LOGGER.exception("Unexpected error validating password")
return False


async def receive_file(
hass: HomeAssistant, contents: aiohttp.BodyPartReader, path: Path
) -> None:
Expand Down
26 changes: 15 additions & 11 deletions homeassistant/components/backup/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .config import ScheduleState
from .const import DATA_MANAGER, LOGGER
from .manager import ManagerStateEvent
from .manager import IncorrectPasswordError, ManagerStateEvent
from .models import Folder


Expand Down Expand Up @@ -131,16 +131,20 @@ async def handle_restore(
msg: dict[str, Any],
) -> None:
"""Restore a backup."""
await hass.data[DATA_MANAGER].async_restore_backup(
msg["backup_id"],
agent_id=msg["agent_id"],
password=msg.get("password"),
restore_addons=msg.get("restore_addons"),
restore_database=msg["restore_database"],
restore_folders=msg.get("restore_folders"),
restore_homeassistant=msg["restore_homeassistant"],
)
connection.send_result(msg["id"])
try:
await hass.data[DATA_MANAGER].async_restore_backup(
msg["backup_id"],
agent_id=msg["agent_id"],
password=msg.get("password"),
restore_addons=msg.get("restore_addons"),
restore_database=msg["restore_database"],
restore_folders=msg.get("restore_folders"),
restore_homeassistant=msg["restore_homeassistant"],
)
except IncorrectPasswordError:
connection.send_error(msg["id"], "password_incorrect", "Incorrect password")
else:
connection.send_result(msg["id"])


@websocket_api.require_admin
Expand Down
11 changes: 11 additions & 0 deletions tests/components/backup/snapshots/test_websocket.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,17 @@
# name: test_restore_remote_agent[remote_agents1-backups1].1
1
# ---
# name: test_restore_wrong_password
dict({
'error': dict({
'code': 'password_incorrect',
'message': 'Incorrect password',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_subscribe_event
dict({
'event': dict({
Expand Down
63 changes: 62 additions & 1 deletion tests/components/backup/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,9 @@ async def test_async_trigger_restore(
patch("pathlib.Path.open"),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
patch(
"homeassistant.components.backup.manager.validate_password"
) as validate_password_mock,
patch.object(BackupAgentTest, "async_download_backup") as download_mock,
):
download_mock.return_value.__aiter__.return_value = iter((b"backup data",))
Expand All @@ -1132,19 +1135,72 @@ async def test_async_trigger_restore(
restore_folders=None,
restore_homeassistant=restore_homeassistant,
)
backup_path = f"{hass.config.path()}/{dir}/abc123.tar"
expected_restore_file = json.dumps(
{
"path": f"{hass.config.path()}/{dir}/abc123.tar",
"path": backup_path,
"password": password,
"remove_after_restore": agent_id != LOCAL_AGENT_ID,
"restore_database": restore_database,
"restore_homeassistant": restore_homeassistant,
}
)
validate_password_mock.assert_called_once_with(Path(backup_path), password)
assert mocked_write_text.call_args[0][0] == expected_restore_file
assert mocked_service_call.called


async def test_async_trigger_restore_wrong_password(hass: HomeAssistant) -> None:
"""Test trigger restore."""
password = "hunter2"
manager = BackupManager(hass, CoreBackupReaderWriter(hass))
hass.data[DATA_MANAGER] = manager

await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await _setup_backup_platform(
hass,
domain="test",
platform=Mock(
async_get_backup_agents=AsyncMock(
return_value=[BackupAgentTest("remote", backups=[TEST_BACKUP_ABC123])]
),
spec_set=BackupAgentPlatformProtocol,
),
)
await manager.load_platforms()

local_agent = manager.backup_agents[LOCAL_AGENT_ID]
local_agent._backups = {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123}
local_agent._loaded_backups = True

with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
patch(
"homeassistant.components.backup.manager.validate_password"
) as validate_password_mock,
):
validate_password_mock.return_value = False
with pytest.raises(
HomeAssistantError, match="The password provided is incorrect."
):
await manager.async_restore_backup(
TEST_BACKUP_ABC123.backup_id,
agent_id=LOCAL_AGENT_ID,
password=password,
restore_addons=None,
restore_database=True,
restore_folders=None,
restore_homeassistant=True,
)

backup_path = f"{hass.config.path()}/backups/abc123.tar"
validate_password_mock.assert_called_once_with(Path(backup_path), password)
mocked_write_text.assert_not_called()
mocked_service_call.assert_not_called()


@pytest.mark.parametrize(
("parameters", "expected_error"),
[
Expand Down Expand Up @@ -1191,6 +1247,11 @@ async def test_async_trigger_restore_wrong_parameters(

with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
pytest.raises(HomeAssistantError, match=expected_error),
):
await manager.async_restore_backup(**(default_parameters | parameters))

mocked_write_text.assert_not_called()
mocked_service_call.assert_not_called()
48 changes: 46 additions & 2 deletions tests/components/backup/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from __future__ import annotations

import tarfile
from unittest.mock import Mock, patch

import pytest

from homeassistant.components.backup import AddonInfo, AgentBackup, Folder
from homeassistant.components.backup.util import read_backup
from homeassistant.components.backup.util import read_backup, validate_password


@pytest.mark.parametrize(
Expand Down Expand Up @@ -83,6 +84,49 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
mock_path.stat.return_value.st_size = 1234

with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
mock_open_tar.return_value.__enter__.return_value.extractfile().read.return_value = backup_json_content
mock_open_tar.return_value.__enter__.return_value.extractfile.return_value.read.return_value = backup_json_content
backup = read_backup(mock_path)
assert backup == expected_backup


@pytest.mark.parametrize("password", [None, "hunter2"])
def test_validate_password(password: str | None) -> None:
"""Test validating a password."""
mock_path = Mock()

with (
patch("homeassistant.components.backup.util.tarfile.open"),
patch("homeassistant.components.backup.util.SecureTarFile"),
):
assert validate_password(mock_path, password) is True


@pytest.mark.parametrize("password", [None, "hunter2"])
@pytest.mark.parametrize("secure_tar_side_effect", [tarfile.ReadError, Exception])
def test_validate_password_wrong_password(
password: str | None, secure_tar_side_effect: Exception
) -> None:
"""Test validating a password."""
mock_path = Mock()

with (
patch("homeassistant.components.backup.util.tarfile.open"),
patch(
"homeassistant.components.backup.util.SecureTarFile",
) as mock_secure_tar,
):
mock_secure_tar.return_value.__enter__.side_effect = secure_tar_side_effect
assert validate_password(mock_path, password) is False


def test_validate_password_no_homeassistant() -> None:
"""Test validating a password."""
mock_path = Mock()

with (
patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar,
):
mock_open_tar.return_value.__enter__.return_value.extractfile.side_effect = (
KeyError
)
assert validate_password(mock_path, "hunter2") is False
40 changes: 39 additions & 1 deletion tests/components/backup/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ async def test_restore_local_agent(
with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text"),
patch("homeassistant.components.backup.manager.validate_password"),
):
await client.send_json_auto_id(
{
Expand Down Expand Up @@ -606,7 +607,11 @@ async def test_restore_remote_agent(
client = await hass_ws_client(hass)
await hass.async_block_till_done()

with patch("pathlib.Path.write_text"), patch("pathlib.Path.open"):
with (
patch("pathlib.Path.write_text"),
patch("pathlib.Path.open"),
patch("homeassistant.components.backup.manager.validate_password"),
):
await client.send_json_auto_id(
{
"type": "backup/restore",
Expand All @@ -618,6 +623,39 @@ async def test_restore_remote_agent(
assert len(restart_calls) == snapshot


async def test_restore_wrong_password(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test calling the restore command."""
await setup_backup_integration(
hass, with_hassio=False, backups={LOCAL_AGENT_ID: [TEST_BACKUP_ABC123]}
)
restart_calls = async_mock_service(hass, "homeassistant", "restart")

client = await hass_ws_client(hass)
await hass.async_block_till_done()

with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text"),
patch(
"homeassistant.components.backup.manager.validate_password",
return_value=False,
),
):
await client.send_json_auto_id(
{
"type": "backup/restore",
"backup_id": "abc123",
"agent_id": "backup.local",
}
)
assert await client.receive_json() == snapshot
assert len(restart_calls) == 0


@pytest.mark.parametrize(
"access_token_fixture_name",
["hass_access_token", "hass_supervisor_access_token"],
Expand Down