1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-05-19 14:18:53 +01:00
Files
supervisor/tests/api/test_backups.py
T
Stefan Agner 33ab5b55f8 Treat JobException as a client-side API error (#6777)
* Treat JobException as a client-side API error

Job condition guards (system not running, no free space, etc.) and
concurrency rejections (another job in flight) raised by the @Job
decorator are explicit precondition failures with descriptive messages,
not unexpected errors. JobException inheriting HassioError directly
meant api_process caught them in its HassioError branch — which since
#6739 logs them as unexpected and captures them to Sentry.

Inherit APIError instead so api_process surfaces these through its
APIError branch with the original message and skips the
unexpected-error path. Status stays at APIError's default 400, so the
API contract is unchanged.

Extended test_backup_immediate_errors to assert async_capture_exception
is not called for the freeze and free-space condition guards.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Silence too-many-ancestors on plugin job error mixins

The plugin-specific job error subclasses (CliJobError, ObserverJobError,
MulticastJobError, CoreDNSJobError, AudioJobError) cross pylint's
too-many-ancestors threshold once JobException inherits APIError. Add
the same `# pylint: disable=too-many-ancestors` already used on the
ResolutionNotFound subclasses with similar diamond inheritance.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Disable too-many-ancestors globally instead of per class

The pylint config already disables every other too-many-* rule "for the
sake of readability", but kept too-many-ancestors and forced inline
disables on diamond-inherited exception classes (the ResolutionNotFound
subclasses, and now five plugin job error mixins after the JobException
APIError change).

Add too-many-ancestors to the global disable list and drop all eight
inline annotations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:21:13 +02:00

1718 lines
59 KiB
Python

"""Test backups API."""
import asyncio
from pathlib import Path, PurePath
from shutil import copy
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, PropertyMock, patch
from aiodocker.containers import DockerContainer
from aiohttp import MultipartWriter
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
import pytest
from supervisor.addons.addon import App
from supervisor.backups.backup import Backup, BackupLocation
from supervisor.backups.manager import BackupManager
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.docker.manager import DockerAPI
from supervisor.exceptions import (
AppPrePostBackupCommandReturnedError,
AppsError,
BackupInvalidError,
HomeAssistantBackupError,
)
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from supervisor.homeassistant.websocket import HomeAssistantWebSocket
from supervisor.jobs import SupervisorJob
from supervisor.mounts.mount import Mount
from supervisor.supervisor import Supervisor
from tests.common import get_fixture_path
from tests.const import TEST_ADDON_SLUG
@pytest.mark.usefixtures("mock_full_backup")
async def test_info(api_client: TestClient, coresys: CoreSys, tmp_path: Path):
"""Test info endpoint."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client.get("/backups/info")
result = await resp.json()
assert result["data"]["days_until_stale"] == 30
assert len(result["data"]["backups"]) == 1
assert result["data"]["backups"][0]["slug"] == "test"
assert result["data"]["backups"][0]["content"]["homeassistant"] is True
assert len(result["data"]["backups"][0]["content"]["addons"]) == 1
assert result["data"]["backups"][0]["content"]["addons"][0] == "local_ssh"
assert result["data"]["backups"][0]["size"] == 0.01
assert result["data"]["backups"][0]["size_bytes"] == 10240
@pytest.mark.usefixtures("mock_full_backup")
async def test_backup_more_info(
api_client: TestClient, coresys: CoreSys, tmp_path: Path
):
"""Test info endpoint."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client.get("/backups/test/info")
result = await resp.json()
assert result["data"]["slug"] == "test"
assert result["data"]["homeassistant"] == "2022.8.0"
assert len(result["data"]["addons"]) == 1
assert result["data"]["addons"][0] == {
"name": "SSH",
"size": 0,
"slug": "local_ssh",
"version": "1.0.0",
}
assert result["data"]["size"] == 0.01
assert result["data"]["size_bytes"] == 10240
assert result["data"]["homeassistant_exclude_database"] is False
@pytest.mark.usefixtures("mock_full_backup")
async def test_list(api_client: TestClient, coresys: CoreSys, tmp_path: Path):
"""Test list endpoint."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client.get("/backups")
result = await resp.json()
assert len(result["data"]["backups"]) == 1
assert result["data"]["backups"][0]["slug"] == "test"
assert result["data"]["backups"][0]["content"]["homeassistant"] is True
assert len(result["data"]["backups"][0]["content"]["addons"]) == 1
assert result["data"]["backups"][0]["content"]["addons"][0] == "local_ssh"
assert result["data"]["backups"][0]["size"] == 0.01
assert result["data"]["backups"][0]["size_bytes"] == 10240
async def test_options(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test options endpoint."""
client, prefix = api_client_with_prefix
assert coresys.backups.days_until_stale == 30
with patch.object(type(coresys.backups), "save_data") as save_data:
await client.post(
f"{prefix}/backups/options",
json={
"days_until_stale": 10,
},
)
save_data.assert_called_once()
assert coresys.backups.days_until_stale == 10
@pytest.mark.parametrize(
("location", "backup_dir", "api_location"),
[
("backup_test", PurePath("mounts", "backup_test"), "backup_test"),
(None, PurePath("backup"), None),
("", PurePath("backup"), None),
(".local", PurePath("backup"), None),
],
)
@pytest.mark.usefixtures("path_extern", "mount_propagation", "mock_is_mount")
async def test_backup_to_location(
api_client: TestClient,
coresys: CoreSys,
location: str | None,
backup_dir: PurePath,
api_location: str | None,
tmp_supervisor_data: Path,
):
"""Test making a backup to a specific location with default mount."""
await coresys.mounts.load()
(coresys.config.path_mounts / "backup_test").mkdir()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
"/backups/new/full",
json={
"name": "Mount test",
"location": location,
},
)
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["slug"]
assert (tmp_supervisor_data / backup_dir / f"{slug}.tar").exists()
resp = await api_client.get(f"/backups/{slug}/info")
result = await resp.json()
assert result["result"] == "ok"
assert result["data"]["location"] == api_location
@pytest.mark.usefixtures(
"tmp_supervisor_data", "path_extern", "mount_propagation", "mock_is_mount"
)
async def test_backup_to_default(api_client: TestClient, coresys: CoreSys):
"""Test making backup to default mount."""
await coresys.mounts.load()
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
"/backups/new/full",
json={"name": "Mount test"},
)
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["slug"]
assert (mount_dir / f"{slug}.tar").exists()
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_api_freeze_thaw(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
ha_ws_client: AsyncMock,
):
"""Test manual freeze and thaw for external backup via API."""
client, prefix = api_client_with_prefix
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
await client.post(f"{prefix}/backups/freeze")
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/start"}
for call in ha_ws_client.async_send_command.call_args_list
)
ha_ws_client.async_send_command.reset_mock()
await client.post(f"{prefix}/backups/thaw")
assert coresys.core.state == CoreState.RUNNING
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/end"}
for call in ha_ws_client.async_send_command.call_args_list
)
@pytest.mark.parametrize(
"partial_backup,exclude_db_setting",
[(False, True), (True, True), (False, False), (True, False)],
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_api_backup_exclude_database(
api_client: TestClient,
coresys: CoreSys,
partial_backup: bool,
exclude_db_setting: bool,
):
"""Test backups exclude the database when specified."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
coresys.homeassistant.backups_exclude_database = exclude_db_setting
json = {} if exclude_db_setting else {"homeassistant_exclude_database": True}
with patch.object(HomeAssistant, "backup") as backup:
if partial_backup:
resp = await api_client.post(
"/backups/new/partial", json={"homeassistant": True} | json
)
else:
resp = await api_client.post("/backups/new/full", json=json)
backup.assert_awaited_once_with(ANY, True)
assert resp.status == 200
async def _get_job_info(api_client: TestClient, job_id: str) -> dict[str, Any]:
"""Test background job progress and block until it is done."""
resp = await api_client.get(f"/jobs/{job_id}")
assert resp.status == 200
result = await resp.json()
return result["data"]
@pytest.mark.parametrize(
"backup_type,options",
[
("full", {}),
(
"partial",
{
"homeassistant": True,
"folders": ["addons/local", "media", "share", "ssl"],
},
),
],
)
@pytest.mark.usefixtures("path_extern")
async def test_api_backup_restore_background(
api_client: TestClient,
coresys: CoreSys,
backup_type: str,
options: dict[str, Any],
tmp_supervisor_data: Path,
supervisor_internet: AsyncMock,
):
"""Test background option on backup/restore APIs."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True)
assert coresys.jobs.jobs == []
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"background": True, "name": f"{backup_type} backup"} | options,
)
assert resp.status == 200
result = await resp.json()
job_id = result["data"]["job_id"]
assert (await _get_job_info(api_client, job_id))["done"] is False
while not (job := (await _get_job_info(api_client, job_id)))["done"]:
await asyncio.sleep(0)
assert job["name"] == f"backup_manager_{backup_type}_backup"
assert (backup_slug := job["reference"])
assert job["child_jobs"][0]["name"] == "backup_store_homeassistant"
assert job["child_jobs"][0]["reference"] == backup_slug
assert job["child_jobs"][1]["name"] == "backup_store_folders"
assert job["child_jobs"][1]["reference"] == backup_slug
assert {j["reference"] for j in job["child_jobs"][1]["child_jobs"]} == {
"addons/local",
"media",
"share",
"ssl",
}
with patch.object(HomeAssistantCore, "start"):
resp = await api_client.post(
f"/backups/{backup_slug}/restore/{backup_type}",
json={"background": True} | options,
)
assert resp.status == 200
result = await resp.json()
job_id = result["data"]["job_id"]
assert (await _get_job_info(api_client, job_id))["done"] is False
while not (job := (await _get_job_info(api_client, job_id)))["done"]:
await asyncio.sleep(0)
assert job["name"] == f"backup_manager_{backup_type}_restore"
assert job["reference"] == backup_slug
assert job["child_jobs"][0]["name"] == "backup_restore_folders"
assert job["child_jobs"][0]["reference"] == backup_slug
assert {j["reference"] for j in job["child_jobs"][0]["child_jobs"]} == {
"addons/local",
"media",
"share",
"ssl",
}
assert job["child_jobs"][1]["name"] == "backup_restore_homeassistant"
assert job["child_jobs"][1]["reference"] == backup_slug
if backup_type == "full":
assert job["child_jobs"][2]["name"] == "backup_remove_delta_addons"
assert job["child_jobs"][2]["reference"] == backup_slug
@pytest.mark.parametrize(
"backup_type,options",
[
("full", {}),
(
"partial",
{
"homeassistant": True,
"folders": ["addons/local", "media", "share", "ssl"],
"addons": ["local_ssh"],
},
),
],
)
@pytest.mark.usefixtures("install_app_ssh", "path_extern")
async def test_api_backup_errors(
api_client: TestClient,
coresys: CoreSys,
backup_type: str,
options: dict[str, Any],
tmp_supervisor_data: Path,
):
"""Test error reporting in backup job."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True)
assert coresys.jobs.jobs == []
with patch.object(App, "backup", side_effect=(err := AppsError("Backup error"))):
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": f"{backup_type} backup"} | options,
)
assert resp.status == 200
result = await resp.json()
job_id = result["data"]["job_id"]
slug = result["data"]["slug"]
job = await _get_job_info(api_client, job_id)
assert job["name"] == f"backup_manager_{backup_type}_backup"
assert job["done"] is True
assert job["reference"] == slug
assert job["errors"] == []
assert job["child_jobs"][0]["name"] == "backup_store_homeassistant"
assert job["child_jobs"][0]["reference"] == slug
assert job["child_jobs"][1]["name"] == "backup_store_addons"
assert job["child_jobs"][1]["reference"] == slug
assert job["child_jobs"][1]["child_jobs"][0]["name"] == "backup_addon_save"
assert job["child_jobs"][1]["child_jobs"][0]["reference"] == "local_ssh"
assert job["child_jobs"][1]["child_jobs"][0]["errors"] == [
{
"type": "BackupError",
"message": str(err),
"stage": None,
"error_key": None,
"extra_fields": None,
}
]
assert job["child_jobs"][2]["name"] == "backup_store_folders"
assert job["child_jobs"][2]["reference"] == slug
assert {j["reference"] for j in job["child_jobs"][2]["child_jobs"]} == {
"addons/local",
"media",
"share",
"ssl",
}
with (
patch.object(
HomeAssistant,
"backup",
side_effect=HomeAssistantBackupError("Backup error"),
),
patch.object(App, "backup"),
):
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": f"{backup_type} backup"} | options,
)
assert resp.status == 400
result = await resp.json()
job_id = result["job_id"]
job = await _get_job_info(api_client, job_id)
assert job["name"] == f"backup_manager_{backup_type}_backup"
assert job["done"] is True
assert job["errors"] == [
{
"type": "HomeAssistantBackupError",
"message": "Backup error",
"stage": "home_assistant",
"error_key": None,
"extra_fields": None,
}
]
assert job["child_jobs"][0]["name"] == "backup_store_homeassistant"
assert job["child_jobs"][0]["errors"] == [
{
"type": "HomeAssistantBackupError",
"message": "Backup error",
"stage": None,
"error_key": None,
"extra_fields": None,
}
]
assert len(job["child_jobs"]) == 1
async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys):
"""Test backup errors that return immediately even in background mode."""
with patch("supervisor.api.utils.async_capture_exception") as capture_exception:
await coresys.core.set_state(CoreState.FREEZE)
resp = await api_client.post(
"/backups/new/full",
json={"name": "Test", "background": True},
)
assert resp.status == 400
assert "freeze" in (await resp.json())["message"]
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 0.5
resp = await api_client.post(
"/backups/new/partial",
json={"name": "Test", "homeassistant": True, "background": True},
)
assert resp.status == 400
assert "not enough free space" in (await resp.json())["message"]
# Job condition failures are user/environment driven; api_process
# must not capture them to Sentry.
capture_exception.assert_not_called()
async def test_restore_immediate_errors(
request: pytest.FixtureRequest,
api_client: TestClient,
coresys: CoreSys,
mock_partial_backup: Backup,
supervisor_internet: AsyncMock,
):
"""Test restore errors that return immediately even in background mode."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/full", json={"background": True}
)
assert resp.status == 400
assert "only a partial backup" in (await resp.json())["message"]
with (
patch.object(Backup, "validate_backup"),
patch.object(
Backup,
"supervisor_version",
new=PropertyMock(return_value=AwesomeVersion("2024.01.0")),
),
patch.object(
Supervisor,
"version",
new=PropertyMock(return_value=AwesomeVersion("2023.12.0")),
),
):
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
json={"background": True, "homeassistant": True},
)
assert resp.status == 400
assert "Must update supervisor" in (await resp.json())["message"]
with (
patch.object(
Backup,
"all_locations",
new={None: BackupLocation(path=Path("/"), protected=True, size_bytes=0)},
),
patch.object(
Backup,
"validate_backup",
side_effect=BackupInvalidError("Invalid password"),
),
):
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
json={"background": True, "homeassistant": True},
)
assert resp.status == 400
assert "Invalid password" in (await resp.json())["message"]
with (
patch.object(Backup, "validate_backup"),
patch.object(Backup, "homeassistant", new=PropertyMock(return_value=None)),
):
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
json={"background": True, "homeassistant": True},
)
assert resp.status == 400
assert "No Home Assistant" in (await resp.json())["message"]
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
json={"background": True, "folders": ["ssl"]},
)
assert resp.status == 404
assert "file does not exist" in (await resp.json())["message"]
@pytest.mark.parametrize(
("folder", "location"), [("backup", None), ("core/backup", ".cloud_backup")]
)
async def test_reload(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data: Path,
folder: str,
location: str | None,
):
"""Test backups reload."""
client, prefix = api_client_with_prefix
assert not coresys.backups.list_backups
backup_file = get_fixture_path("backup_example.tar")
copy(backup_file, tmp_supervisor_data / folder)
resp = await client.post(f"{prefix}/backups/reload")
assert resp.status == 200
assert len(coresys.backups.list_backups) == 1
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location == location
assert backup.locations == [location]
@pytest.mark.usefixtures("install_app_ssh")
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_cloud_backup_core_only(api_client: TestClient, mock_full_backup: Backup):
"""Test only core can access cloud backup location."""
resp = await api_client.post(
"/backups/new/full",
json={
"name": "Mount test",
"location": ".cloud_backup",
},
)
assert resp.status == 403
resp = await api_client.post(
"/backups/new/partial",
json={"name": "Test", "homeassistant": True, "location": ".cloud_backup"},
)
assert resp.status == 403
# pylint: disable-next=protected-access
mock_full_backup._locations = {
".cloud_backup": BackupLocation(
path=Path("/"), protected=False, size_bytes=10240
)
}
assert mock_full_backup.location == ".cloud_backup"
resp = await api_client.post(f"/backups/{mock_full_backup.slug}/restore/full")
assert resp.status == 403
resp = await api_client.post(
f"/backups/{mock_full_backup.slug}/restore/partial",
json={"homeassistant": True},
)
assert resp.status == 403
resp = await api_client.delete(f"/backups/{mock_full_backup.slug}")
assert resp.status == 403
resp = await api_client.get(f"/backups/{mock_full_backup.slug}/download")
assert resp.status == 403
async def test_upload_download(
api_client: TestClient, coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test upload and download of a backup."""
# Capture our backup initially
backup_file = get_fixture_path("backup_example.tar")
backup = Backup(coresys, backup_file, "in", None)
await backup.load()
# Upload it and confirm it matches what we had
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file)
resp = await api_client.post("/backups/new/upload", data=mp)
assert resp.status == 200
body = await resp.json()
assert body["data"]["slug"] == "7fed74c8"
assert backup == coresys.backups.get("7fed74c8")
# Download it and confirm it against the original again
resp = await api_client.get("/backups/7fed74c8/download")
assert resp.status == 200
out_file = tmp_supervisor_data / "backup_example.tar"
with out_file.open("wb") as out:
out.write(await resp.read())
out_backup = Backup(coresys, out_file, "out", None)
await out_backup.load()
assert backup == out_backup
@pytest.mark.usefixtures("path_extern", "tmp_supervisor_data")
@pytest.mark.parametrize(
("backup_type", "inputs", "locations"),
[
("full", {}, [None, ".cloud_backup"]),
("full", {}, [".cloud_backup", None]),
("partial", {"folders": ["ssl"]}, [None, ".cloud_backup"]),
("partial", {"folders": ["ssl"]}, [".cloud_backup", None]),
],
)
async def test_backup_to_multiple_locations(
api_client: TestClient,
coresys: CoreSys,
backup_type: str,
inputs: dict[str, Any],
locations: list[str | None],
):
"""Test making a backup to multiple locations."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": "Multiple locations test", "location": locations} | inputs,
)
assert resp.status == 200
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["slug"]
orig_backup = coresys.config.path_backup / f"{slug}.tar"
copy_backup = coresys.config.path_core_backup / f"{slug}.tar"
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get(slug).all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
}
assert coresys.backups.get(slug).location is None
@pytest.mark.usefixtures("path_extern", "tmp_supervisor_data")
@pytest.mark.parametrize(
("backup_type", "inputs"), [("full", {}), ("partial", {"folders": ["ssl"]})]
)
async def test_backup_to_multiple_locations_error_on_copy(
api_client: TestClient,
coresys: CoreSys,
backup_type: str,
inputs: dict[str, Any],
):
"""Test making a backup to multiple locations that fails during copy stage."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch("supervisor.backups.manager.copy", side_effect=OSError):
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={
"name": "Multiple locations test",
"location": [None, ".cloud_backup"],
}
| inputs,
)
assert resp.status == 200
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["slug"]
orig_backup = coresys.config.path_backup / f"{slug}.tar"
assert await coresys.run_in_executor(orig_backup.exists)
assert coresys.backups.get(slug).all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
}
assert coresys.backups.get(slug).location is None
resp = await api_client.get("/jobs/info")
assert resp.status == 200
result = await resp.json()
assert result["data"]["jobs"][0]["name"] == f"backup_manager_{backup_type}_backup"
assert result["data"]["jobs"][0]["reference"] == slug
assert result["data"]["jobs"][0]["done"] is True
assert len(result["data"]["jobs"][0]["errors"]) == 0
# Errors during copy to additional location should be recorded in child jobs only.
assert (
result["data"]["jobs"][0]["child_jobs"][-1]["name"]
== "backup_copy_to_additional_locations"
)
assert (
result["data"]["jobs"][0]["child_jobs"][-1]["child_jobs"][0]["name"]
== "backup_copy_to_location"
)
assert (
result["data"]["jobs"][0]["child_jobs"][-1]["child_jobs"][0]["reference"]
== ".cloud_backup"
)
assert result["data"]["jobs"][0]["child_jobs"][-1]["child_jobs"][0]["errors"] == [
{
"type": "BackupError",
"message": "Could not copy backup to .cloud_backup due to: ",
"stage": None,
"error_key": None,
"extra_fields": None,
}
]
@pytest.mark.parametrize(
("backup_type", "inputs"), [("full", {}), ("partial", {"folders": ["ssl"]})]
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_backup_with_extras(
api_client: TestClient,
coresys: CoreSys,
backup_type: str,
inputs: dict[str, Any],
):
"""Test backup including extra metdata."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": "Extras test", "extra": {"user": "test", "scheduled": True}}
| inputs,
)
assert resp.status == 200
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["slug"]
resp = await api_client.get(f"/backups/{slug}/info")
assert resp.status == 200
result = await resp.json()
assert result["result"] == "ok"
slug = result["data"]["extra"] == {"user": "test", "scheduled": True}
@pytest.mark.usefixtures("tmp_supervisor_data")
@pytest.mark.parametrize("local_location", ["", ".local"])
async def test_upload_to_multiple_locations(
api_client: TestClient, coresys: CoreSys, local_location: str
):
"""Test uploading a backup to multiple locations."""
backup_file = get_fixture_path("backup_example.tar")
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file)
resp = await api_client.post(
f"/backups/new/upload?location={local_location}&location=.cloud_backup",
data=mp,
)
assert resp.status == 200
body = await resp.json()
assert body["data"]["slug"] == "7fed74c8"
orig_backup = coresys.config.path_backup / "7fed74c8.tar"
copy_backup = coresys.config.path_core_backup / "7fed74c8.tar"
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
}
assert coresys.backups.get("7fed74c8").location is None
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_upload_duplicate_backup_new_location(
api_client: TestClient, coresys: CoreSys
):
"""Test uploading a backup that already exists to a new location."""
backup_file = get_fixture_path("backup_example.tar")
orig_backup = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload()
assert coresys.backups.get("7fed74c8").all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
}
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file)
resp = await api_client.post(
"/backups/new/upload?location=.cloud_backup", data=mp
)
assert resp.status == 200
body = await resp.json()
assert body["data"]["slug"] == "7fed74c8"
copy_backup = coresys.config.path_core_backup / "7fed74c8.tar"
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=copy_backup, protected=False, size_bytes=10240
),
}
assert coresys.backups.get("7fed74c8").location is None
@pytest.mark.parametrize(
("filename", "expected_status"),
[("good.tar", 200), ("../bad.tar", 400), ("bad", 400)],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_upload_with_filename(
api_client: TestClient, coresys: CoreSys, filename: str, expected_status: int
):
"""Test uploading a backup to multiple locations."""
backup_file = get_fixture_path("backup_example.tar")
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file)
resp = await api_client.post(
f"/backups/new/upload?filename={filename}", data=mp
)
assert resp.status == expected_status
body = await resp.json()
if expected_status != 200:
assert (
body["message"]
== r"does not match regular expression ^[^\\\/]+\.tar$."
+ f" Got '{filename}'"
)
return
assert body["data"]["slug"] == "7fed74c8"
orig_backup = coresys.config.path_backup / filename
assert orig_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: BackupLocation(path=orig_backup, protected=False, size_bytes=10240),
}
assert coresys.backups.get("7fed74c8").location is None
@pytest.mark.parametrize(
("method", "url"),
[
("get", "/backups/bad/info"),
("delete", "/backups/bad"),
("post", "/backups/bad/restore/full"),
("post", "/backups/bad/restore/partial"),
("get", "/backups/bad/download"),
],
)
async def test_backup_not_found(
api_client_with_prefix: tuple[TestClient, str], method: str, url: str
):
"""Test backup not found error."""
client, prefix = api_client_with_prefix
resp = await client.request(method, f"{prefix}{url}")
assert resp.status == 404
resp = await resp.json()
assert resp["message"] == "Backup does not exist"
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_remove_backup_from_location(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test removing a backup from one location of multiple."""
client, prefix = api_client_with_prefix
backup_file = get_fixture_path("backup_example.tar")
location_1 = Path(copy(backup_file, coresys.config.path_backup))
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
}
resp = await client.delete(
f"{prefix}/backups/7fed74c8", json={"location": ".cloud_backup"}
)
assert resp.status == 200
assert location_1.exists()
assert not location_2.exists()
assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
}
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_remove_backup_file_not_found(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test removing a backup from one location of multiple."""
client, prefix = api_client_with_prefix
backup_file = get_fixture_path("backup_example.tar")
location = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: BackupLocation(path=location, protected=False, size_bytes=10240),
}
location.unlink()
resp = await client.delete(f"{prefix}/backups/7fed74c8")
assert resp.status == 404
body = await resp.json()
assert (
body["message"]
== f"Cannot delete backup at {str(location)}, file does not exist!"
)
@pytest.mark.parametrize("local_location", ["", ".local"])
async def test_download_backup_from_location(
api_client: TestClient,
coresys: CoreSys,
tmp_supervisor_data: Path,
local_location: str,
):
"""Test downloading a backup from a specific location."""
backup_file = get_fixture_path("backup_example.tar")
location_1 = Path(copy(backup_file, coresys.config.path_backup))
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
}
# The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't
assert backup.location is None
location_2.unlink()
resp = await api_client.get("/backups/7fed74c8/download?location=.cloud_backup")
assert resp.status == 404
resp = await api_client.get(f"/backups/7fed74c8/download?location={local_location}")
assert resp.status == 200
out_file = tmp_supervisor_data / "backup_example.tar"
with out_file.open("wb") as out:
out.write(await resp.read())
out_backup = Backup(coresys, out_file, "out", None)
await out_backup.load()
assert backup == out_backup
@pytest.mark.usefixtures("mock_full_backup")
async def test_download_backup_from_invalid_location(
api_client_with_prefix: tuple[TestClient, str],
):
"""Test error for invalid download location."""
client, prefix = api_client_with_prefix
resp = await client.get(f"{prefix}/backups/test/download?location=.cloud_backup")
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Backup test is not in location .cloud_backup"
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_partial_backup_all_apps(
api_client: TestClient,
coresys: CoreSys,
install_app_ssh: App,
):
"""Test backup including extra metdata."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(Backup, "store_apps") as store_apps:
resp = await api_client.post(
"/backups/new/partial", json={"name": "All addons test", "addons": "ALL"}
)
assert resp.status == 200
store_apps.assert_called_once_with([install_app_ssh])
@pytest.mark.parametrize("local_location", [None, "", ".local"])
async def test_restore_backup_from_location(
api_client: TestClient,
coresys: CoreSys,
tmp_supervisor_data: Path,
local_location: str | None,
supervisor_internet: AsyncMock,
):
"""Test restoring a backup from a specific location."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Make a backup and a file to test with
(test_file := coresys.config.path_share / "test.txt").touch()
resp = await api_client.post(
"/backups/new/partial",
json={
"name": "Test",
"folders": ["share"],
"location": [None, ".cloud_backup"],
},
)
assert resp.status == 200
body = await resp.json()
backup = coresys.backups.get(body["data"]["slug"])
assert set(backup.all_locations) == {None, ".cloud_backup"}
# The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't
assert backup.location is None
(backup_local_path := backup.all_locations[None].path).unlink()
test_file.unlink()
resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial",
json={"location": local_location, "folders": ["share"]},
)
assert resp.status == 404
body = await resp.json()
assert (
body["message"]
== f"Cannot validate backup at {backup_local_path.as_posix()}, file does not exist!"
)
resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial",
json={"location": ".cloud_backup", "folders": ["share"]},
)
assert resp.status == 200
assert test_file.is_file()
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_restore_backup_unencrypted_after_encrypted(
api_client: TestClient,
coresys: CoreSys,
supervisor_internet: AsyncMock,
):
"""Test restoring an unencrypted backup after an encrypted backup and vis-versa."""
enc_tar = copy(get_fixture_path("test_consolidate.tar"), coresys.config.path_backup)
unc_tar = copy(
get_fixture_path("test_consolidate_unc.tar"), coresys.config.path_core_backup
)
await coresys.backups.reload()
backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == {
None: BackupLocation(path=Path(enc_tar), protected=True, size_bytes=10240),
".cloud_backup": BackupLocation(
path=Path(unc_tar),
protected=False,
size_bytes=10240,
),
}
# pylint: disable=fixme
# TODO: There is a bug in the restore code that causes the restore to fail
# if the backup contains a Docker registry configuration and one location
# is encrypted and the other is not (just like our test fixture).
# We punt the ball on this one for this PR since this is a rare edge case.
backup.restore_dockerconfig = MagicMock()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Restore encrypted backup
(test_file := coresys.config.path_ssl / "test.txt").touch()
resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial",
json={"location": None, "password": "test", "folders": ["ssl"]},
)
assert resp.status == 200
body = await resp.json()
assert body["result"] == "ok"
assert not test_file.is_file()
# Restore unencrypted backup
test_file.touch()
resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial",
json={"location": ".cloud_backup", "folders": ["ssl"]},
)
assert resp.status == 200
body = await resp.json()
assert body["result"] == "ok"
assert not test_file.is_file()
# Restore encrypted backup
test_file.touch()
resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial",
json={"location": None, "password": "test", "folders": ["ssl"]},
)
assert resp.status == 200
body = await resp.json()
assert body["result"] == "ok"
assert not test_file.is_file()
@pytest.mark.parametrize(
("backup_type", "postbody"), [("partial", {"homeassistant": True}), ("full", {})]
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_restore_homeassistant_adds_env(
api_client: TestClient,
coresys: CoreSys,
docker: DockerAPI,
backup_type: str,
postbody: dict[str, Any],
supervisor_internet: AsyncMock,
):
"""Test restoring home assistant from backup adds env to container."""
event = asyncio.Event()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2025.1.0")
backup = await coresys.backups.do_backup_full()
async def mock_async_send_message(_, message: dict[str, Any]):
"""Mock of async send message in ws client."""
if (
message["data"]["event"] == "job"
and message["data"]["data"]["name"]
== f"backup_manager_{backup_type}_restore"
and message["data"]["data"]["reference"] == backup.slug
and message["data"]["data"]["done"]
):
event.set()
with (
patch.object(HomeAssistantCore, "_block_till_run"),
patch.object(
HomeAssistantWebSocket,
"_async_send_command",
new=mock_async_send_message,
),
):
resp = await api_client.post(
f"/backups/{backup.slug}/restore/{backup_type}",
json={"background": True} | postbody,
)
assert resp.status == 200
body = await resp.json()
job = coresys.jobs.get_job(body["data"]["job_id"])
if not job.done:
await asyncio.wait_for(event.wait(), 5)
assert docker.containers.create.call_args.kwargs["name"] == "homeassistant"
assert (
f"SUPERVISOR_RESTORE_JOB_ID={job.uuid}"
in docker.containers.create.call_args.args[0]["Env"]
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_backup_mixed_encryption(api_client: TestClient, coresys: CoreSys):
"""Test a backup with mixed encryption status across locations."""
enc_tar = copy(get_fixture_path("test_consolidate.tar"), coresys.config.path_backup)
unc_tar = copy(
get_fixture_path("test_consolidate_unc.tar"), coresys.config.path_core_backup
)
await coresys.backups.reload()
backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == {
None: BackupLocation(path=Path(enc_tar), protected=True, size_bytes=10240),
".cloud_backup": BackupLocation(
path=Path(unc_tar),
protected=False,
size_bytes=10240,
),
}
resp = await api_client.get("/backups")
assert resp.status == 200
body = await resp.json()
assert body["data"]["backups"][0]["slug"] == "d9c48f8b"
assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True
assert body["data"]["backups"][0]["location_attributes"] == {
".local": {
"protected": True,
"size_bytes": 10240,
}
}
@pytest.mark.parametrize(
("backup_type", "options"), [("full", {}), ("partial", {"folders": ["ssl"]})]
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_protected_backup(
api_client: TestClient, coresys: CoreSys, backup_type: str, options: dict[str, Any]
):
"""Test creating a protected backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": "test", "password": "test"} | options,
)
assert resp.status == 200
body = await resp.json()
assert (slug := body["data"]["slug"])
resp = await api_client.get("/backups")
assert resp.status == 200
body = await resp.json()
assert body["data"]["backups"][0]["slug"] == slug
assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True
assert (
body["data"]["backups"][0]["location_attributes"][".local"]["protected"] is True
)
# NOTE: It is not safe to check size exactly here, as order of keys in
# `homeassistant.json` and potentially other random data (e.g. isntance UUID,
# backup slug) does change the size of the backup (due to gzip).
assert body["data"]["backups"][0]["location_attributes"][".local"]["size_bytes"] > 0
resp = await api_client.get(f"/backups/{slug}/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["location"] is None
assert body["data"]["locations"] == [None]
assert body["data"]["protected"] is True
assert body["data"]["location_attributes"][".local"]["protected"] is True
assert body["data"]["location_attributes"][".local"]["size_bytes"] > 0
@pytest.mark.usefixtures(
"path_extern", "mount_propagation", "mock_is_mount", "tmp_supervisor_data"
)
async def test_upload_to_mount(api_client: TestClient, coresys: CoreSys):
"""Test uploading a backup to a specific mount."""
await coresys.mounts.load()
(coresys.config.path_mounts / "backup_test").mkdir()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
await coresys.mounts.create_mount(mount)
# Capture our backup initially
backup_file = get_fixture_path("backup_example.tar")
backup = Backup(coresys, backup_file, "in", None)
await backup.load()
# Upload it and confirm it matches what we had
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file)
resp = await api_client.post(
"/backups/new/upload?location=backup_test", data=mp
)
assert resp.status == 200
body = await resp.json()
assert body["data"]["slug"] == "7fed74c8"
assert backup == coresys.backups.get("7fed74c8")
@pytest.mark.parametrize(
("method", "url_path", "body", "backup_file"),
[
(
"delete",
"/backups/7fed74c8",
{"location": ".cloud_backup"},
"backup_example.tar",
),
(
"post",
"/backups/7fed74c8/restore/partial",
{"location": ".cloud_backup", "folders": ["ssl"]},
"backup_example.tar",
),
(
"get",
"/backups/7fed74c8/download?location=.cloud_backup",
None,
"backup_example.tar",
),
(
"post",
"/backups/93b462f8/restore/partial",
{"location": ".cloud_backup", "folders": ["ssl"], "password": "bad"},
"backup_example_enc.tar",
),
],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_missing_file_removes_location_from_cache(
api_client: TestClient,
coresys: CoreSys,
method: str,
url_path: str,
body: dict[str, Any] | None,
backup_file: str,
supervisor_internet: AsyncMock,
):
"""Test finding a missing file removes the location from cache."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file)
copy(backup_file, coresys.config.path_backup)
bad_location = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert len(coresys.backups.list_backups) == 1
slug = list(coresys.backups.list_backups)[0].slug
# After reload, remove one of the file and confirm we have an out of date cache
bad_location.unlink()
assert coresys.backups.get(slug).all_locations.keys() == {
None,
".cloud_backup",
}
resp = await api_client.request(method, url_path, json=body)
assert resp.status == 404
# Wait for reload task to complete and confirm location is removed
await asyncio.sleep(0.01)
assert coresys.backups.get(slug).all_locations.keys() == {None}
@pytest.mark.parametrize(
("method", "url_path", "body", "backup_file"),
[
("delete", "/backups/7fed74c8", {"location": ".local"}, "backup_example.tar"),
(
"post",
"/backups/7fed74c8/restore/partial",
{"location": ".local", "folders": ["ssl"]},
"backup_example.tar",
),
(
"get",
"/backups/7fed74c8/download?location=.local",
None,
"backup_example.tar",
),
(
"post",
"/backups/93b462f8/restore/partial",
{"location": ".local", "folders": ["ssl"], "password": "bad"},
"backup_example_enc.tar",
),
],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_missing_file_removes_backup_from_cache(
api_client: TestClient,
coresys: CoreSys,
method: str,
url_path: str,
body: dict[str, Any] | None,
backup_file: str,
supervisor_internet: AsyncMock,
):
"""Test finding a missing file removes the backup from cache if its the only one."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file)
bad_location = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload()
assert len(coresys.backups.list_backups) == 1
slug = list(coresys.backups.list_backups)[0].slug
# After reload, remove one of the file and confirm we have an out of date cache
bad_location.unlink()
assert coresys.backups.get(slug).all_locations.keys() == {None}
resp = await api_client.request(method, url_path, json=body)
assert resp.status == 404
# Wait for reload task to complete and confirm backup is removed
await asyncio.sleep(0.01)
assert not coresys.backups.list_backups
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_immediate_list_after_missing_file_restore(
api_client: TestClient,
coresys: CoreSys,
supervisor_internet: AsyncMock,
):
"""Test race with reload for missing file on restore does not error."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path("backup_example.tar")
bad_location = Path(copy(backup_file, coresys.config.path_backup))
# Copy a second backup in so there's something to reload later
copy(get_fixture_path("backup_example_enc.tar"), coresys.config.path_backup)
await coresys.backups.reload()
# After reload, remove one of the file and confirm we have an out of date cache
bad_location.unlink()
assert coresys.backups.get("7fed74c8").all_locations.keys() == {None}
event = asyncio.Event()
orig_wait = asyncio.wait
async def mock_wait(tasks: list[asyncio.Task], *args, **kwargs):
"""Mock for asyncio wait that allows force of race condition."""
if tasks[0].get_coro().__qualname__.startswith("BackupManager.reload"):
await event.wait()
return await orig_wait(tasks, *args, **kwargs)
with patch("supervisor.backups.manager.asyncio.wait", new=mock_wait):
resp = await api_client.post(
"/backups/7fed74c8/restore/partial",
json={"location": ".local", "folders": ["ssl"]},
)
assert resp.status == 404
await asyncio.sleep(0)
resp = await api_client.get("/backups")
assert resp.status == 200
result = await resp.json()
assert len(result["data"]["backups"]) == 2
event.set()
await asyncio.sleep(0.1)
resp = await api_client.get("/backups")
assert resp.status == 200
result = await resp.json()
assert len(result["data"]["backups"]) == 1
assert result["data"]["backups"][0]["slug"] == "93b462f8"
@pytest.mark.parametrize("command", ["backup_pre", "backup_post"])
@pytest.mark.usefixtures("install_app_example", "tmp_supervisor_data")
async def test_pre_post_backup_command_error(
api_client: TestClient, coresys: CoreSys, container: DockerContainer, command: str
):
"""Test pre/post backup command error."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
container.exec.return_value.inspect.return_value = {"ExitCode": 1}
with patch.object(App, command, new=PropertyMock(return_value="test")):
resp = await api_client.post(
"/backups/new/partial", json={"addons": ["local_example"]}
)
assert resp.status == 200
body = await resp.json()
job_id = body["data"]["job_id"]
job: SupervisorJob | None = None
for j in coresys.jobs.jobs:
if j.name == "backup_store_addons" and j.parent_id == job_id:
job = j
break
assert job
assert job.done is True
assert job.errors[0].type_ == AppPrePostBackupCommandReturnedError
assert job.errors[0].message == (
"Pre-/Post backup command for app local_example returned error code: "
"1. Please report this to the app developer. Enable debug "
"logging to capture complete command output using ha supervisor options --logging debug"
)
assert job.errors[0].error_key == "addon_pre_post_backup_command_returned_error"
assert job.errors[0].extra_fields == {
"addon": "local_example",
"exit_code": 1,
"debug_logging_command": "ha supervisor options --logging debug",
}
async def test_restore_partial_with_addons_key(
api_client: TestClient,
coresys: CoreSys,
mock_partial_backup: Backup,
):
"""Test that partial restore accepts 'addons' key in request body and remaps it to 'apps'."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
BackupManager, "do_restore_partial", return_value=True
) as mock_restore:
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
json={"addons": ["local_ssh"]},
)
assert resp.status == 200
mock_restore.assert_called_once()
_, call_kwargs = mock_restore.call_args
assert "apps" in call_kwargs
assert call_kwargs["apps"] == ["local_ssh"]
assert "addons" not in call_kwargs
# ── V2 API tests ──────────────────────────────────────────────────────────────
async def test_v2_api_disabled_without_feature_flag(api_client: TestClient):
"""V2 API routes return 404 when SUPERVISOR_V2_API feature flag is not enabled."""
resp = await api_client.get("/v2/backups")
assert resp.status == 404
resp = await api_client.get("/v2/backups/info")
assert resp.status == 404
resp = await api_client.get("/v2/apps")
assert resp.status == 404
resp = await api_client.get("/v2/store")
assert resp.status == 404
@pytest.mark.usefixtures("mock_full_backup")
async def test_v2_info_uses_apps_key(
api_client_v2: TestClient, coresys: CoreSys, tmp_path: Path
):
"""V2 /backups/info returns content with 'apps' key (not 'addons')."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client_v2.get("/v2/backups/info")
result = await resp.json()
assert result["result"] == "ok"
assert len(result["data"]["backups"]) == 1
content = result["data"]["backups"][0]["content"]
assert "apps" in content
assert "addons" not in content
assert content["apps"] == ["local_ssh"]
@pytest.mark.usefixtures("mock_full_backup")
async def test_v2_list_uses_apps_key(
api_client_v2: TestClient, coresys: CoreSys, tmp_path: Path
):
"""V2 /backups returns content with 'apps' key (not 'addons')."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client_v2.get("/v2/backups")
result = await resp.json()
assert result["result"] == "ok"
content = result["data"]["backups"][0]["content"]
assert "apps" in content
assert "addons" not in content
assert content["apps"] == ["local_ssh"]
@pytest.mark.usefixtures("mock_full_backup")
async def test_v2_backup_info_uses_apps_key(
api_client_v2: TestClient, coresys: CoreSys, tmp_path: Path
):
"""V2 /backups/{slug}/info returns 'apps' key at top level (not 'addons')."""
copy(get_fixture_path("backup_example.tar"), tmp_path / "test_backup.tar")
resp = await api_client_v2.get("/v2/backups/test/info")
result = await resp.json()
assert result["result"] == "ok"
assert "apps" in result["data"]
assert "addons" not in result["data"]
assert result["data"]["apps"][0]["slug"] == "local_ssh"
async def test_v2_backup_partial_accepts_apps_key(
api_client_v2: TestClient,
coresys: CoreSys,
mock_partial_backup: Backup,
):
"""V2 /backups/new/partial accepts 'apps' key in request body."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
BackupManager, "do_backup_partial", return_value=mock_partial_backup
) as mock_backup:
resp = await api_client_v2.post(
"/v2/backups/new/partial",
json={"apps": ["local_ssh"]},
)
assert resp.status == 200
mock_backup.assert_called_once()
_, call_kwargs = mock_backup.call_args
assert "apps" in call_kwargs
assert call_kwargs["apps"] == ["local_ssh"]
async def test_v2_backup_partial_rejects_addons_key(
api_client_v2: TestClient,
coresys: CoreSys,
):
"""V2 /backups/new/partial does not accept 'addons' key (v2 is strict)."""
await coresys.core.set_state(CoreState.RUNNING)
resp = await api_client_v2.post(
"/v2/backups/new/partial",
json={"addons": ["local_ssh"]},
)
# "addons" key is rejected in v2
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert "addons" in result["message"]
async def test_v2_restore_partial_accepts_apps_key(
api_client_v2: TestClient,
coresys: CoreSys,
mock_partial_backup: Backup,
):
"""V2 restore/partial accepts 'apps' key and passes it through without remapping."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
BackupManager, "do_restore_partial", return_value=True
) as mock_restore:
resp = await api_client_v2.post(
f"/v2/backups/{mock_partial_backup.slug}/restore/partial",
json={"apps": ["local_ssh"]},
)
assert resp.status == 200
mock_restore.assert_called_once()
_, call_kwargs = mock_restore.call_args
assert "apps" in call_kwargs
assert call_kwargs["apps"] == ["local_ssh"]
assert "addons" not in call_kwargs