1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-27 21:35:48 +00:00
Files
core/tests/test_backup_restore.py

484 lines
16 KiB
Python

"""Test methods in backup_restore."""
import json
from pathlib import Path
import tarfile
from typing import Any
from unittest import mock
import pytest
from homeassistant import backup_restore
from .common import get_fixture_path
def restore_result_file_content(config_dir: Path) -> dict[str, Any] | None:
"""Return the content of the restore result file."""
try:
return json.loads((config_dir / ".HA_RESTORE_RESULT").read_text("utf-8"))
except FileNotFoundError:
return None
@pytest.mark.parametrize(
("restore_config", "expected", "restore_result"),
[
(
"restore1.json", # Empty file, so JSONDecodeError is expected
None,
{
"success": False,
"error": "Expecting value: line 1 column 1 (char 0)",
"error_type": "JSONDecodeError",
},
),
(
"restore2.json", # File missing the 'password' key, so KeyError is expected
None,
{"success": False, "error": "'password'", "error_type": "KeyError"},
),
(
"restore3.json", # Valid file
backup_restore.RestoreBackupFileContent(
backup_file_path=Path("test"),
password="psw",
remove_after_restore=False,
restore_database=False,
restore_homeassistant=True,
),
None,
),
(
"restore4.json", # Valid file
backup_restore.RestoreBackupFileContent(
backup_file_path=Path("test"),
password=None,
remove_after_restore=True,
restore_database=True,
restore_homeassistant=False,
),
None,
),
],
)
def test_reading_the_instruction_contents(
restore_config: str,
expected: backup_restore.RestoreBackupFileContent | None,
restore_result: dict[str, Any] | None,
tmp_path: Path,
) -> None:
"""Test reading the content of the .HA_RESTORE file."""
get_fixture_path(f"core/backup_restore/{restore_config}", None).copy(
tmp_path / ".HA_RESTORE"
)
restore_file_path = tmp_path / ".HA_RESTORE"
assert restore_file_path.exists()
read_content = backup_restore.restore_backup_file_content(tmp_path)
assert read_content == expected
assert not restore_file_path.exists()
assert restore_result_file_content(tmp_path) == restore_result
def test_reading_the_instruction_contents_missing(tmp_path: Path) -> None:
"""Test reading the content of the .HA_RESTORE file when it is missing."""
assert not (tmp_path / ".HA_RESTORE").exists()
read_content = backup_restore.restore_backup_file_content(tmp_path)
assert read_content is None
assert not (tmp_path / ".HA_RESTORE").exists()
assert restore_result_file_content(tmp_path) is None
@pytest.mark.parametrize(
("restore_config"),
[
"restore3.json",
"restore4.json",
],
)
def test_restoring_backup_that_does_not_exist(
restore_config: str, tmp_path: Path
) -> None:
"""Test restoring a backup that does not exist."""
get_fixture_path(f"core/backup_restore/{restore_config}", None).copy(
tmp_path / ".HA_RESTORE"
)
restore_file_path = tmp_path / ".HA_RESTORE"
assert restore_file_path.exists()
with (
pytest.raises(ValueError, match="Backup file test does not exist"),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is False
assert restore_result_file_content(tmp_path) == {
"error": "Backup file test does not exist",
"error_type": "ValueError",
"success": False,
}
@pytest.mark.parametrize(
("restore_config", "restore_result"),
[
(
"restore1.json", # Empty file, so JSONDecodeError is expected
{
"success": False,
"error": "Expecting value: line 1 column 1 (char 0)",
"error_type": "JSONDecodeError",
},
),
(
"restore2.json", # File missing the 'password' key, so KeyError is expected
{"success": False, "error": "'password'", "error_type": "KeyError"},
),
],
)
def test_restoring_backup_when_instructions_can_not_be_read(
restore_config: str, restore_result: dict[str, Any], tmp_path: Path
) -> None:
"""Test restoring a backup when instructions can not be read."""
get_fixture_path(f"core/backup_restore/{restore_config}", None).copy(
tmp_path / ".HA_RESTORE"
)
restore_file_path = tmp_path / ".HA_RESTORE"
assert restore_file_path.exists()
assert backup_restore.restore_backup(tmp_path.as_posix()) is False
assert not restore_file_path.exists()
assert restore_result_file_content(tmp_path) == restore_result
def test_restoring_backup_when_instructions_missing(tmp_path: Path) -> None:
"""Test restoring a backup when instructions are missing."""
restore_file_path = tmp_path / ".HA_RESTORE"
assert not restore_file_path.exists()
assert backup_restore.restore_backup(tmp_path.as_posix()) is False
assert not restore_file_path.exists()
assert restore_result_file_content(tmp_path) is None
@pytest.mark.parametrize(
("restore_config"),
[
"restore3.json",
"restore4.json",
],
)
def test_restoring_backup_that_is_not_a_file(
restore_config: str, tmp_path: Path
) -> None:
"""Test restoring a backup that is not a file."""
backup_file_path = tmp_path / "test"
restore_file_path = tmp_path / ".HA_RESTORE"
# Set up restore file to point to a file within the temporary directory
restore_config = json.load(
get_fixture_path(f"core/backup_restore/{restore_config}", None).open(
"r", encoding="utf-8"
)
)
restore_config["path"] = backup_file_path.as_posix()
json.dump(restore_config, restore_file_path.open("w", encoding="utf-8"))
assert restore_file_path.exists()
# Create a directory at the backup file path to simulate the backup file not being a file
backup_file_path.mkdir(exist_ok=True)
with (
pytest.raises(IsADirectoryError, match="\\[Errno 21\\] Is a directory"),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is False
restore_result = restore_result_file_content(tmp_path)
assert restore_result == {
"error": mock.ANY,
"error_type": "IsADirectoryError",
"success": False,
}
assert restore_result["error"].startswith("[Errno 21] Is a directory:")
@pytest.mark.parametrize(
("restore_config"),
[
"restore3.json",
"restore4.json",
],
)
def test_aborting_for_older_versions(restore_config: str, tmp_path: Path) -> None:
"""Test that we abort for older versions."""
backup_file_path = tmp_path / "backup_from_future.tar"
restore_file_path = tmp_path / ".HA_RESTORE"
# Set up restore file to point to a file within the temporary directory
restore_config = json.load(
get_fixture_path(f"core/backup_restore/{restore_config}", None).open(
"r", encoding="utf-8"
)
)
restore_config["path"] = backup_file_path.as_posix()
json.dump(restore_config, restore_file_path.open("w", encoding="utf-8"))
assert restore_file_path.exists()
get_fixture_path("core/backup_restore/backup_from_future.tar", None).copy_into(
tmp_path
)
with (
pytest.raises(
ValueError,
match="You need at least Home Assistant version 9999.99.99 to restore this backup",
),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
assert restore_result_file_content(tmp_path) == {
"error": (
"You need at least Home Assistant version 9999.99.99 to restore this backup"
),
"error_type": "ValueError",
"success": False,
}
@pytest.mark.parametrize(
(
"restore_backup_content",
"expected_kept_files",
"expected_restored_files",
"expected_directories_after_restore",
),
[
(
backup_restore.RestoreBackupFileContent(
backup_file_path=None,
password=None,
remove_after_restore=False,
restore_database=True,
restore_homeassistant=True,
),
{"backups/test.tar"},
{"home-assistant_v2.db", "home-assistant_v2.db-wal"},
{"backups"},
),
(
backup_restore.RestoreBackupFileContent(
backup_file_path=None,
password=None,
restore_database=False,
remove_after_restore=False,
restore_homeassistant=True,
),
{"backups/test.tar", "home-assistant_v2.db", "home-assistant_v2.db-wal"},
set(),
{"backups"},
),
(
backup_restore.RestoreBackupFileContent(
backup_file_path=None,
password=None,
restore_database=True,
remove_after_restore=False,
restore_homeassistant=False,
),
{".HA_RESTORE", ".HA_VERSION", "backups/test.tar"},
{"home-assistant_v2.db", "home-assistant_v2.db-wal"},
{"backups", "tmp_backups", "www"},
),
],
)
def test_restore_backup(
restore_backup_content: backup_restore.RestoreBackupFileContent,
expected_kept_files: set[str],
expected_restored_files: set[str],
expected_directories_after_restore: set[str],
tmp_path: Path,
) -> None:
"""Test restoring a backup.
This includes checking that expected files are kept, restored, and
that we are cleaning up the current configuration directory.
"""
backup_file_path = tmp_path / "backups" / "test.tar"
def get_files(path: Path) -> set[str]:
"""Get all files under path."""
return {str(f.relative_to(path)) for f in path.rglob("*")}
existing_dirs = {
"backups",
"tmp_backups",
"www",
}
existing_files = {
".HA_RESTORE",
".HA_VERSION",
"home-assistant_v2.db",
"home-assistant_v2.db-wal",
}
for d in existing_dirs:
(tmp_path / d).mkdir(exist_ok=True)
for f in existing_files:
(tmp_path / f).write_text("before_restore")
get_fixture_path(
"core/backup_restore/empty_backup_database_included.tar", None
).copy(backup_file_path)
files_before_restore = get_files(tmp_path)
assert files_before_restore == {
".HA_RESTORE",
".HA_VERSION",
"backups",
"backups/test.tar",
"home-assistant_v2.db",
"home-assistant_v2.db-wal",
"tmp_backups",
"www",
}
kept_files_data = {}
for file in expected_kept_files:
kept_files_data[file] = (tmp_path / file).read_bytes()
restore_backup_content.backup_file_path = backup_file_path
with (
mock.patch(
"homeassistant.backup_restore.restore_backup_file_content",
return_value=restore_backup_content,
),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
files_after_restore = get_files(tmp_path)
assert (
files_after_restore
== {".HA_RESTORE_RESULT"}
| expected_kept_files
| expected_restored_files
| expected_directories_after_restore
)
for d in expected_directories_after_restore:
assert (tmp_path / d).is_dir()
for file in expected_kept_files:
assert (tmp_path / file).read_bytes() == kept_files_data[file]
for file in expected_restored_files:
assert (tmp_path / file).read_bytes() == b"restored_from_backup"
assert restore_result_file_content(tmp_path) == {
"error": None,
"error_type": None,
"success": True,
}
def test_restore_backup_filter_files(tmp_path: Path) -> None:
"""Test filtering dangerous files when restoring a backup."""
backup_file_path = tmp_path / "backups" / "test.tar"
backup_file_path.parent.mkdir()
get_fixture_path(
"core/backup_restore/empty_backup_database_included.tar", None
).copy(backup_file_path)
with (
tarfile.open(backup_file_path, "r") as outer_tar,
tarfile.open(
fileobj=outer_tar.extractfile("homeassistant.tar.gz"), mode="r|gz"
) as inner_tar,
):
member_names = {member.name for member in inner_tar.getmembers()}
assert member_names == {
".",
"../bad_file_with_parent_link",
"/bad_absolute_file",
"data",
"data/home-assistant_v2.db",
"data/home-assistant_v2.db-wal",
}
real_extractone = tarfile.TarFile._extract_one
with (
mock.patch(
"homeassistant.backup_restore.restore_backup_file_content",
return_value=backup_restore.RestoreBackupFileContent(
backup_file_path=backup_file_path,
password=None,
remove_after_restore=False,
restore_database=True,
restore_homeassistant=True,
),
),
mock.patch(
"tarfile.TarFile._extract_one", autospec=True, wraps=real_extractone
) as extractone_mock,
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
# Check the unsafe files are not extracted, and that the safe files are extracted
extracted_files = {call.args[1].name for call in extractone_mock.mock_calls}
assert extracted_files == {
"./backup.json", # From the outer tar
"homeassistant.tar.gz", # From the outer tar
".",
"data",
"data/home-assistant_v2.db",
"data/home-assistant_v2.db-wal",
}
assert restore_result_file_content(tmp_path) == {
"error": None,
"error_type": None,
"success": True,
}
@pytest.mark.parametrize(("remove_after_restore"), [True, False])
def test_remove_backup_file_after_restore(
remove_after_restore: bool, tmp_path: Path
) -> None:
"""Test removing a backup file after restore."""
backup_file_path = tmp_path / "backups" / "test.tar"
backup_file_path.parent.mkdir()
get_fixture_path(
"core/backup_restore/empty_backup_database_included.tar", None
).copy(backup_file_path)
with (
mock.patch(
"homeassistant.backup_restore.restore_backup_file_content",
return_value=backup_restore.RestoreBackupFileContent(
backup_file_path=backup_file_path,
password=None,
remove_after_restore=remove_after_restore,
restore_database=True,
restore_homeassistant=True,
),
),
):
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
assert backup_file_path.exists() == (not remove_after_restore)
assert restore_result_file_content(tmp_path) == {
"error": None,
"error_type": None,
"success": True,
}
@pytest.mark.parametrize(
("password", "expected"),
[
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
],
)
def test_pw_to_key(password: str | None, expected: bytes | None) -> None:
"""Test password to key conversion."""
assert backup_restore.password_to_key(password) == expected
def test_pw_to_key_none() -> None:
"""Test password to key conversion."""
with pytest.raises(AttributeError):
backup_restore.password_to_key(None)