1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-05-22 15:48:51 +01:00
Files
supervisor/tests/backups/test_manager.py
T
Stefan Agner 267fc6cd71 mounts: make is_mounted honest about server reachability (#6838)
* mounts: use softerr for NFS instead of soft

Switch the NFS mount option from `soft` to `softerr`. For HAOS-style
supervisor mounts (media, share, backup — not the root filesystem) the
error semantics matter:

* `softerr` returns `ETIMEDOUT` on timeout instead of `EIO` (`soft`).
  `EIO` is indistinguishable from "the disk is dying"; tools like
  SQLite, restic, rsync, ffmpeg tend to treat it as a hard storage
  failure (mark database corrupt, abort backup with a hard error,
  etc.). `ETIMEDOUT` is unambiguously "the network/server is gone,
  transient" and is more commonly handled as retry-later. Supervisor
  can also surface a clear "server unreachable" notification rather
  than a generic I/O error.

* `softerr` was added in kernel 5.10 precisely to give the fail-fast
  behavior of `soft` with a distinct errno so well-behaved apps can
  do the right thing.

* For writes-must-not-be-lost use cases (databases, paid storage,
  evidence-grade logging) one would want `hard,intr` and a different
  recovery story. HAOS NFS mounts are not that — they're add-on
  storage where "the share went offline, try again later" is the
  correct user-visible behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* mounts: make is_mounted honest about server reachability

systemd's "active/mounted" state is derived from /proc/self/mountinfo
and doesn't reflect whether the backing server actually answers. For
CIFS in particular, smb3_reconfigure never contacts the server, so a
reload of a dead share returns active/mounted with no recovery
attempted. PR #4882 added a Path.is_mount() cross-check to catch this,
but Path.is_mount() relies on os.stat() of the mountpoint root — and
both NFS (softreval) and CIFS (cached root inode attrs) can serve that
stat from local state without going to the wire, so it lies in exactly
the dead-share scenario it was meant to detect.

The visible failure: an API reload of a backup mount whose server had
gone away "succeeded", supervisor then kicked off sys_backups.reload()
against the dead share, and the executor fanned out hundreds of
"Task exception was never retrieved" OSError(112) tracebacks as each
backup tarball's stat() parked in the kernel and failed.

Replace the local-state checks with a statvfs() probe in
NetworkMount.is_mounted(). os.statvfs() returns per-filesystem data
(free blocks, total blocks) that has no client-side cache in either
kernel — neither cifs_statfs() nor nfs_statfs() has an early-return on
cache freshness; both build and send a real FSSTAT / QUERY_FS_INFO
request. So the kernel either reaches the server or gives up with
ETIMEDOUT / EHOSTDOWN / ECONNABORTED. is_mounted() now reflects actual
reachability, finally fulfilling PR #4882's stated intent.

With is_mounted() honest, the reload/restart machinery falls out
naturally: Mount.reload()'s fast path is just `if await self.is_mounted()`,
the post-reload check is the same call, and the "reload succeeded
systemd-wise but probe failed" branch collapses into the existing
"not mounted after reload, try restart" branch. mount(), _restart()
and update() already call is_mounted(); they now get the probe for
free.

To make the probe meaningful, the network mount option strings get
explicit kernel-side timeouts:

* NFS switches from `soft` to `softerr` so timeouts surface as
  ETIMEDOUT rather than EIO. EIO is indistinguishable from
  "disk dying" and gets misinterpreted by SQLite/restic/rsync as a
  hard storage failure; ETIMEDOUT is unambiguously transient.
* CIFS gains `soft,echo_interval=10,retrans=0`, giving a ~30s
  per-operation detection budget (3 x echo_interval since last server
  response) that matches the NFS budget from `timeo=100,retrans=2`.
  Both protocols now fail bounded operations in roughly the same time.

The probe is intentionally not wrapped in an asyncio timeout: the
kernel-side bound is authoritative, and an asyncio timeout would only
orphan the executor thread without unblocking the syscall. The probe
emits debug logs with timing so the ~30s syscall wait on a dead share
is visible in LOGLEVEL=debug traces instead of appearing as a hang.

Tests:

* mock_is_mount fixture extended to also patch os.statvfs so existing
  tests that rely on a healthy mount don't need to know about the
  probe.
* New manager test split into _healthy_skips_systemd (probe succeeds,
  fast path, no systemd call) and _probe_failure_triggers_systemd_reload
  (probe fails, escalation runs). API reload test covers both paths.
* Existing tests that simulated "mount is down" via
  mock_is_mount.return_value=False updated to simulate probe failure
  via OSError(EHOSTDOWN), since is_mount is no longer the signal.

* mounts: drive systemd job waits via JobRemoved instead of state polling

`_update_state_await` had a race that has been present since network
mounts were introduced (#4269) and survived every subsequent rewrite
(#4733, c75f36305): the wait function reads the unit's ActiveState
*after* the systemd dispatch returns, then matches against a target
set that almost always includes the pre-call state (ACTIVE). If
systemd has not yet started dispatching the queued job — and for
fast operations like CIFS reload (smb3_reconfigure completes in
milliseconds with no server contact) it routinely has not — the wait
matches immediately on the pre-call state and returns "done" before
the operation has actually begun. The race was invisible until the
probe commit started doing honest network work after the wait
returned, at which point we spent a full ~30s NFS probe wait on a
mount that systemd was still in the middle of restarting.

Switch the job-dispatching paths (mount/unmount/reload/restart) to
systemd's `JobRemoved` Manager signal. The pattern:

    async with sys_dbus.systemd.job_removed() as jobs:
        job_path = await sys_dbus.systemd.restart_unit(...)
        await jobs.wait_for_job(job_path)

is structurally race-free: subscription is set up before dispatch,
the job_path is allocated synchronously inside the dispatch call,
and any JobRemoved for that path after subscription is queued. Even
for operations that complete faster than we can process the dispatch
return value (the CIFS reload case), the signal cannot be missed.

`_update_state_await` is kept for `load()` — that path observes
existing state without having dispatched a job, so the
PropertiesChanged-driven wait is the right primitive there.

* mounts: verify the path is a mount point before trusting statvfs

The userspace probe trusted statvfs() alone to declare a network mount
healthy. statvfs is uncacheable client-side for both NFS and CIFS, so
on a real mount it forces a wire RPC — but if the mount is gone from
the kernel mount table (e.g., after a restart cycle whose umount
succeeded but whose mount step failed, leaving the unit ACTIVE in
systemd's view), statvfs() operates on a plain directory on the
underlying root filesystem and happily returns the root fs's stats.
The mount is reported healthy when in fact it doesn't exist.

Add a pre-check using `Path.is_mount` — a parent-vs-path `st_dev`
comparison via stat() — to detect "this path is not actually a mount
point" before issuing statvfs. For the ghost-mount case (path on the
root fs) both stats are local and return immediately. For a real
mount the path-stat may cross into the filesystem driver, but the
result is correct in either case and the statvfs that follows
catches server-unreachable mounts that is_mount can't.

The full is_mounted contract is now: ACTIVE per systemd, present as
a mount point in our namespace, and server-reachable per statvfs.

* mounts: skip wasted probes when systemd already reported job failed

JobRemoved tells us whether the systemd job completed successfully —
we just weren't using it. The previous reload/restart paths ran their
post-job probe unconditionally, including the case where systemd had
already told us the operation failed. On a dead NFS share the kernel
is in transport-reconnect churn for tens of seconds after a killed
mount helper exits, so that probe takes 90+ seconds — and confirms
exactly what systemd already said. Plain wasted time.

* `Mount.reload()` now uses the systemd job result. On "done" we
  still probe (CIFS reload is local-only — smb3_reconfigure returns
  "done" even against a dead server, so the probe is the only check
  that catches the lying-CIFS case). On anything else (failed,
  timeout, canceled, dependency, skipped, or our own timeout
  returning None) we escalate directly to `_restart()` without
  probing.
* `Mount._restart()` skips the probe entirely on non-"done" results.
  The mount is definitively not active in that case and the probe
  would just spend another 30-90s on a dead share.

Also bump the JobRemoved wait timeout from UPDATE_STATE_TIMEOUT (40s,
sized for a single-helper invocation) to a new SYSTEMD_JOB_TIMEOUT
(90s). RestartUnit runs as stop + start, each bounded by the unit's
TimeoutSec (35s from #6834), so the worst case is ~70s plus systemd
queue dispatch. The previous 40s budget caused supervisor to time
out exactly one second before JobRemoved fired in the observed
dead-NFS restart cycle. UPDATE_STATE_TIMEOUT is kept at 40s for the
PropertiesChanged-driven wait in `load()`, where the layered timeout
invariant from #6827 still applies.

* dbus: filter signals at the wrapper level, drop JobRemovedSignal class

Mike's review on #6838: instead of carrying a bespoke JobRemovedSignal
context-manager class, push the filtering concept into the generic
DBusSignalWrapper. Any caller that subscribes to a broadcast signal
but only cares about specific payloads can pass a predicate.

* DBus.signal() gains an optional `message_filter: Callable[..., bool]`.
* DBusSignalWrapper.wait_for_signal() loops past messages where the
  filter returns False, returning the next match.
* JobRemovedSignal goes away. supervisor/dbus/systemd.py exposes a
  small factory `job_removed_filter(get_job_path)` that returns the
  matching predicate; the getter indirection lets callers subscribe
  before the dispatch returns the job path (which is required to be
  race-free — see the previous commit).

Mount._run_systemd_job() switches to:

    job_path: str | None = None
    async with systemd.connected_dbus.signal(
        DBUS_SIGNAL_SYSTEMD_JOB_REMOVED,
        job_removed_filter(lambda: job_path),
    ) as signal:
        job_path = await dispatch
        _id, _path, _unit, result = await signal.wait_for_signal()

The behavior is identical to the previous JobRemovedSignal-based
implementation. The signal queue still buffers messages received
after AddMatch is installed, so subscribing before dispatch keeps
the race-free guarantee.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 09:29:53 +02:00

2227 lines
78 KiB
Python

"""Test BackupManager class."""
import asyncio
import errno
from functools import partial
from pathlib import Path
from shutil import copy, rmtree
from types import SimpleNamespace
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, PropertyMock, patch
from aiodocker.containers import DockerContainer
from awesomeversion import AwesomeVersion
from dbus_fast import DBusError
import pytest
from supervisor.apps.app import App
from supervisor.apps.const import AppBackupMode
from supervisor.apps.model import AppModel
from supervisor.backups.backup import Backup, BackupLocation
from supervisor.backups.const import LOCATION_TYPE, BackupJobStage, BackupType
from supervisor.backups.manager import BackupManager
from supervisor.const import FOLDER_HOMEASSISTANT, FOLDER_SHARE, AppState, CoreState
from supervisor.coresys import CoreSys
from supervisor.docker.app import DockerApp
from supervisor.docker.const import ContainerState
from supervisor.docker.homeassistant import DockerHomeAssistant
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import (
BackupError,
BackupFileNotFoundError,
BackupInvalidError,
BackupJobError,
BackupMountDownError,
DockerError,
)
from supervisor.homeassistant.api import HomeAssistantAPI
from supervisor.homeassistant.const import WSType
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from supervisor.jobs import JobSchedulerOptions
from supervisor.jobs.const import JobCondition
from supervisor.mounts.mount import Mount
from supervisor.resolution.const import UnhealthyReason
from supervisor.utils.json import read_json_file, write_json_file
from tests.common import get_fixture_path
from tests.const import TEST_ADDON_SLUG
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitService
async def test_do_backup_full(coresys: CoreSys, backup_mock, install_app_ssh):
"""Test creating Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
backup_instance: MagicMock = await manager.do_backup_full()
# Check Backup has been created without password
assert backup_instance.new.call_args[0][2] == BackupType.FULL
assert backup_instance.new.call_args[0][3] is None
assert backup_instance.new.call_args[0][4] is True
backup_instance.store_homeassistant.assert_called_once()
backup_instance.store_repositories.assert_called_once()
backup_instance.store_apps.assert_called_once()
assert install_app_ssh in backup_instance.store_apps.call_args[0][0]
backup_instance.store_folders.assert_called_once()
assert len(backup_instance.store_folders.call_args[0][0]) == 4
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.parametrize(
("filename", "filename_expected"),
[("../my file.tar", "/data/backup/my file.tar"), (None, "/data/backup/{}.tar")],
)
async def test_do_backup_full_with_filename(
coresys: CoreSys, filename: str, filename_expected: str, backup_mock
):
"""Test creating Backup with a specific file name."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
await manager.do_backup_full(filename=filename)
slug = backup_mock.call_args[0][2]
assert str(backup_mock.call_args[0][1]) == filename_expected.format(slug)
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("backup_mock")
async def test_do_backup_full_uncompressed(coresys: CoreSys, install_app_ssh: App):
"""Test creating Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
backup_instance: MagicMock = await manager.do_backup_full(compressed=False)
# Check Backup has been created without password
assert backup_instance.new.call_args[0][2] == BackupType.FULL
assert backup_instance.new.call_args[0][3] is None
assert backup_instance.new.call_args[0][4] is False
backup_instance.store_homeassistant.assert_called_once()
backup_instance.store_repositories.assert_called_once()
backup_instance.store_apps.assert_called_once()
assert install_app_ssh in backup_instance.store_apps.call_args[0][0]
backup_instance.store_folders.assert_called_once()
assert len(backup_instance.store_folders.call_args[0][0]) == 4
backup_instance.store_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("backup_mock", "install_app_ssh")
async def test_do_backup_partial_minimal(coresys: CoreSys):
"""Test creating minimal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
backup_instance: MagicMock = await manager.do_backup_partial(homeassistant=False)
# Check Backup has been created without password
assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL
assert backup_instance.new.call_args[0][3] is None
assert backup_instance.new.call_args[0][4] is True
backup_instance.store_homeassistant.assert_not_called()
backup_instance.store_repositories.assert_called_once()
backup_instance.store_apps.assert_not_called()
backup_instance.store_folders.assert_not_called()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("backup_mock", "install_app_ssh")
async def test_do_backup_partial_minimal_uncompressed(coresys: CoreSys):
"""Test creating minimal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
backup_instance: MagicMock = await manager.do_backup_partial(
homeassistant=False, compressed=False
)
# Check Backup has been created without password
assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL
assert backup_instance.new.call_args[0][3] is None
assert backup_instance.new.call_args[0][4] is False
backup_instance.store_homeassistant.assert_not_called()
backup_instance.store_repositories.assert_called_once()
backup_instance.store_apps.assert_not_called()
backup_instance.store_folders.assert_not_called()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("backup_mock")
async def test_do_backup_partial_maximal(coresys: CoreSys, install_app_ssh: App):
"""Test creating maximal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
# backup_mock fixture causes Backup() to be a MagicMock
backup_instance: MagicMock = await manager.do_backup_partial(
apps=[TEST_ADDON_SLUG],
folders=[FOLDER_SHARE, FOLDER_HOMEASSISTANT],
homeassistant=True,
)
# Check Backup has been created without password
assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL
assert backup_instance.new.call_args[0][3] is None
assert backup_instance.new.call_args[0][4] is True
backup_instance.store_homeassistant.assert_called_once()
backup_instance.store_repositories.assert_called_once()
backup_instance.store_apps.assert_called_once()
assert install_app_ssh in backup_instance.store_apps.call_args[0][0]
backup_instance.store_folders.assert_called_once()
assert len(backup_instance.store_folders.call_args[0][0]) == 1
backup_instance.store_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("supervisor_internet")
async def test_do_restore_full(
coresys: CoreSys, full_backup_mock: Backup, install_app_ssh: App
):
"""Test restoring full Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
coresys.homeassistant.core.update = AsyncMock(return_value=None)
install_app_ssh.uninstall = AsyncMock(return_value=None)
manager = await BackupManager(coresys).load_config()
backup_instance = full_backup_mock.return_value
backup_instance.sys_apps = coresys.apps
backup_instance.remove_delta_apps = partial(
Backup.remove_delta_apps, backup_instance
)
assert await manager.do_restore_full(backup_instance)
backup_instance.restore_homeassistant.assert_called_once()
backup_instance.restore_repositories.assert_called_once()
backup_instance.restore_apps.assert_called_once()
install_app_ssh.uninstall.assert_not_called()
backup_instance.restore_folders.assert_called_once()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("supervisor_internet")
async def test_do_restore_full_different_app(
coresys: CoreSys, full_backup_mock: Backup, install_app_ssh: App
):
"""Test restoring full Backup with different apps than installed."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
coresys.homeassistant.core.update = AsyncMock(return_value=None)
install_app_ssh.uninstall = AsyncMock(return_value=None)
manager = await BackupManager(coresys).load_config()
backup_instance = full_backup_mock.return_value
backup_instance.app_list = ["differentslug"]
backup_instance.sys_apps = coresys.apps
backup_instance.remove_delta_apps = partial(
Backup.remove_delta_apps, backup_instance
)
assert await manager.do_restore_full(backup_instance)
backup_instance.restore_homeassistant.assert_called_once()
backup_instance.restore_repositories.assert_called_once()
backup_instance.restore_apps.assert_called_once()
install_app_ssh.uninstall.assert_called_once()
backup_instance.restore_folders.assert_called_once()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("supervisor_internet", "install_app_ssh")
async def test_do_restore_partial_minimal(
coresys: CoreSys, partial_backup_mock: Backup
):
"""Test restoring partial Backup minimal."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
coresys.homeassistant.core.update = AsyncMock(return_value=None)
manager = await BackupManager(coresys).load_config()
backup_instance = partial_backup_mock.return_value
assert await manager.do_restore_partial(backup_instance, homeassistant=False)
backup_instance.restore_homeassistant.assert_not_called()
backup_instance.restore_repositories.assert_not_called()
backup_instance.restore_apps.assert_not_called()
backup_instance.restore_folders.assert_not_called()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("supervisor_internet")
async def test_do_restore_partial_maximal(
coresys: CoreSys, partial_backup_mock: Backup
):
"""Test restoring partial Backup minimal."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
coresys.homeassistant.core.update = AsyncMock(return_value=None)
manager = await BackupManager(coresys).load_config()
backup_instance = partial_backup_mock.return_value
assert await manager.do_restore_partial(
backup_instance,
apps=[TEST_ADDON_SLUG],
folders=[FOLDER_SHARE, FOLDER_HOMEASSISTANT],
homeassistant=True,
)
backup_instance.restore_homeassistant.assert_called_once()
backup_instance.restore_repositories.assert_called_once()
backup_instance.restore_apps.assert_called_once()
backup_instance.restore_folders.assert_called_once()
backup_instance.restore_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING
@pytest.mark.usefixtures("supervisor_internet")
async def test_fail_invalid_full_backup(
coresys: CoreSys,
full_backup_mock: MagicMock,
partial_backup_mock: MagicMock,
):
"""Test restore fails with invalid backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
with pytest.raises(BackupInvalidError):
await manager.do_restore_full(partial_backup_mock.return_value)
backup_instance = full_backup_mock.return_value
backup_instance.all_locations[None].protected = True
backup_instance.validate_backup.side_effect = BackupInvalidError()
with pytest.raises(BackupInvalidError):
await manager.do_restore_full(backup_instance)
backup_instance.all_locations[None].protected = False
backup_instance.supervisor_version = "2022.08.4"
with (
patch.object(
type(coresys.supervisor),
"version",
new=PropertyMock(return_value="2022.08.3"),
),
pytest.raises(BackupInvalidError),
):
await manager.do_restore_full(backup_instance)
@pytest.mark.usefixtures("supervisor_internet")
async def test_fail_invalid_partial_backup(
coresys: CoreSys, partial_backup_mock: MagicMock
):
"""Test restore fails with invalid backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
backup_instance = partial_backup_mock.return_value
backup_instance.all_locations[None].protected = True
backup_instance.validate_backup.side_effect = BackupInvalidError()
with pytest.raises(BackupInvalidError):
await manager.do_restore_partial(backup_instance)
backup_instance.all_locations[None].protected = False
backup_instance.homeassistant = None
with pytest.raises(BackupInvalidError):
await manager.do_restore_partial(backup_instance, homeassistant=True)
backup_instance.supervisor_version = "2022.08.4"
with (
patch.object(
type(coresys.supervisor),
"version",
new=PropertyMock(return_value="2022.08.3"),
),
pytest.raises(BackupInvalidError),
):
await manager.do_restore_partial(backup_instance)
@pytest.mark.usefixtures("install_app_ssh", "capture_exception")
async def test_backup_error_homeassistant(coresys: CoreSys, backup_mock: MagicMock):
"""Test error collected and file deleted when Home Assistant Core backup fails."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_instance = backup_mock.return_value
backup_instance.store_homeassistant.side_effect = (
err := BackupError("Error while storing homeassistant")
)
job, backup_task = coresys.jobs.schedule_job(
coresys.backups.do_backup_full, JobSchedulerOptions()
)
assert await backup_task is None
assert job.errors[0].type_ is type(err)
assert job.errors[0].message == str(err)
assert job.errors[0].stage == BackupJobStage.HOME_ASSISTANT
backup_instance.tarfile.unlink.assert_called_once()
@pytest.mark.usefixtures("install_app_ssh")
async def test_backup_error_capture(
coresys: CoreSys, backup_mock: MagicMock, capture_exception: Mock
):
"""Test error captured when backup fails."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_mock.return_value.store_folders.side_effect = (err := OSError())
backup = await coresys.backups.do_backup_full()
assert backup is None
capture_exception.assert_called_once_with(err)
@pytest.mark.usefixtures("supervisor_internet")
async def test_restore_error(
coresys: CoreSys, full_backup_mock: MagicMock, capture_exception: Mock
):
"""Test restoring full Backup with errors."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
backup_instance = full_backup_mock.return_value
backup_instance.protected = False
backup_instance.restore_homeassistant.side_effect = BackupError()
with pytest.raises(BackupError):
await coresys.backups.do_restore_full(backup_instance)
capture_exception.assert_not_called()
backup_instance.restore_homeassistant.side_effect = (err := DockerError())
with pytest.raises(BackupError):
await coresys.backups.do_restore_full(backup_instance)
capture_exception.assert_called_once_with(err)
@pytest.mark.usefixtures(
"supervisor_internet",
"tmp_supervisor_data",
"path_extern",
"mount_propagation",
"mock_is_mount",
)
async def test_backup_media_with_mounts(
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock]
):
"""Test backing up media folder with mounts."""
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service.response_get_unit = [
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
# Make some normal test files
(test_file_1 := coresys.config.path_media / "test.txt").touch()
(test_dir := coresys.config.path_media / "test").mkdir()
(test_file_2 := coresys.config.path_media / "test" / "inner.txt").touch()
# Add a media mount
await coresys.mounts.load()
await coresys.mounts.create_mount(
Mount.from_dict(
coresys,
{
"name": "media_test",
"usage": "media",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
)
assert (mount_dir := coresys.config.path_media / "media_test").is_dir()
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
# Remove the mount and wipe the media folder
systemd_unit_service.active_state = "inactive"
await coresys.mounts.remove_mount("media_test")
rmtree(coresys.config.path_media)
coresys.config.path_media.mkdir()
# Restore the backup and check that only the test files we made returned
with patch.object(DockerHomeAssistant, "is_running", return_value=True):
await coresys.backups.do_restore_partial(backup, folders=["media"])
assert test_file_1.exists()
assert test_dir.is_dir()
assert test_file_2.exists()
assert not mount_dir.exists()
@pytest.mark.usefixtures(
"supervisor_internet",
"tmp_supervisor_data",
"path_extern",
"mount_propagation",
"mock_is_mount",
)
async def test_backup_media_with_mounts_retains_files(
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock]
):
"""Test backing up media folder with mounts retains mount files."""
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_unit_service.active_state = ["active", "active", "active", "inactive"]
systemd_service.response_get_unit = [
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
# Add a media mount
await coresys.mounts.load()
await coresys.mounts.create_mount(
Mount.from_dict(
coresys,
{
"name": "media_test",
"usage": "media",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
)
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
systemd_service.StopUnit.calls.clear()
systemd_service.StartTransientUnit.calls.clear()
with patch.object(DockerHomeAssistant, "is_running", return_value=True):
await coresys.backups.do_restore_partial(backup, folders=["media"])
assert systemd_service.StopUnit.calls == [
("mnt-data-supervisor-media-media_test.mount", "fail")
]
assert systemd_service.StartTransientUnit.calls == [
("mnt-data-supervisor-media-media_test.mount", "fail", ANY, [])
]
@pytest.mark.usefixtures(
"supervisor_internet",
"tmp_supervisor_data",
"path_extern",
"mount_propagation",
"mock_is_mount",
)
async def test_backup_share_with_mounts(
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock]
):
"""Test backing up share folder with mounts."""
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_unit_service.active_state = [
"active",
"active",
"active",
"inactive",
"active",
"inactive",
]
systemd_service.response_get_unit = [
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
# Make some normal test files
(test_file_1 := coresys.config.path_share / "test.txt").touch()
(test_dir := coresys.config.path_share / "test").mkdir()
(test_file_2 := coresys.config.path_share / "test" / "inner.txt").touch()
# Add a media mount
await coresys.mounts.load()
await coresys.mounts.create_mount(
Mount.from_dict(
coresys,
{
"name": "share_test",
"usage": "share",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
)
assert (mount_dir := coresys.config.path_share / "share_test").is_dir()
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["share"])
# Remove the mount and wipe the media folder
await coresys.mounts.remove_mount("share_test")
rmtree(coresys.config.path_share)
coresys.config.path_share.mkdir()
# Restore the backup and check that only the test files we made returned
with patch.object(DockerHomeAssistant, "is_running", return_value=True):
await coresys.backups.do_restore_partial(backup, folders=["share"])
assert test_file_1.exists()
assert test_dir.is_dir()
assert test_file_2.exists()
assert not mount_dir.exists()
@pytest.mark.usefixtures(
"supervisor_internet",
"tmp_supervisor_data",
"path_extern",
"mount_propagation",
"mock_is_mount",
)
async def test_full_backup_to_mount(coresys: CoreSys):
"""Test full backup to and restoring from a mount."""
(marker := coresys.config.path_homeassistant / "test.txt").touch()
# Add a backup mount
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
assert "backup_test" in coresys.backups.backup_locations
assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
assert (mount_dir / f"{backup.slug}.tar").exists()
# Reload and check that backups in mounts are listed
await coresys.backups.reload()
assert coresys.backups.get(backup.slug)
# Remove marker file and restore. Confirm it comes back
marker.unlink()
with (
patch.object(DockerHomeAssistant, "is_running", return_value=True),
patch.object(
Backup,
"restore_supervisor_config",
new_callable=AsyncMock,
return_value=(True, []),
),
):
await coresys.backups.do_restore_full(backup)
assert marker.exists()
@pytest.mark.usefixtures(
"supervisor_internet",
"tmp_supervisor_data",
"path_extern",
"mount_propagation",
"mock_is_mount",
)
async def test_partial_backup_to_mount(coresys: CoreSys):
"""Test partial backup to and restoring from a mount."""
(marker := coresys.config.path_homeassistant / "test.txt").touch()
# Add a backup mount
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
assert "backup_test" in coresys.backups.backup_locations
assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
HomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
):
backup: Backup = await coresys.backups.do_backup_partial(
"test", homeassistant=True, location=mount
)
assert (mount_dir / f"{backup.slug}.tar").exists()
# Reload and check that backups in mounts are listed
await coresys.backups.reload()
assert coresys.backups.get(backup.slug)
# Remove marker file and restore. Confirm it comes back
marker.unlink()
with (
patch.object(DockerHomeAssistant, "is_running", return_value=True),
patch.object(
Backup,
"restore_supervisor_config",
new_callable=AsyncMock,
return_value=(True, []),
),
):
await coresys.backups.do_restore_partial(backup, homeassistant=True)
assert marker.exists()
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern", "mount_propagation")
async def test_backup_to_down_mount_error(coresys: CoreSys, mock_is_mount: MagicMock):
"""Test backup to mount when down raises error."""
# Add a backup mount
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
assert "backup_test" in coresys.backups.backup_locations
assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Attempt to make a backup which fails because the probe (statvfs)
# surfaces the server as unreachable.
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch(
"supervisor.mounts.mount._probe_network_mount",
side_effect=OSError(errno.EHOSTDOWN, "Host is down"),
):
with pytest.raises(BackupMountDownError):
await coresys.backups.do_backup_full("test", location=mount)
with pytest.raises(BackupMountDownError):
await coresys.backups.do_backup_partial(
"test", location=mount, homeassistant=True
)
@pytest.mark.usefixtures(
"tmp_supervisor_data", "path_extern", "mount_propagation", "mock_is_mount"
)
async def test_backup_to_local_with_default(coresys: CoreSys):
"""Test making backup to local when a default mount is specified."""
# Add a default backup mount
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
# Make a backup for local. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
HomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
):
backup: Backup = await coresys.backups.do_backup_partial(
"test", homeassistant=True, location=None
)
assert (coresys.config.path_backup / f"{backup.slug}.tar").exists()
@pytest.mark.usefixtures(
"tmp_supervisor_data", "path_extern", "mount_propagation", "mock_is_mount"
)
async def test_backup_to_default(coresys: CoreSys):
"""Test making backup to default mount."""
# Add a default backup mount
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
# Make a backup for default. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
HomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
):
backup: Backup = await coresys.backups.do_backup_partial(
"test", homeassistant=True
)
assert (mount_dir / f"{backup.slug}.tar").exists()
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern", "mount_propagation")
async def test_backup_to_default_mount_down_error(
coresys: CoreSys, mock_is_mount: MagicMock
):
"""Test making backup to default mount when it is down."""
# Add a default backup mount
(coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
# Attempt to make a backup which fails because the probe (statvfs)
# surfaces the server as unreachable.
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with (
patch(
"supervisor.mounts.mount._probe_network_mount",
side_effect=OSError(errno.EHOSTDOWN, "Host is down"),
),
pytest.raises(BackupMountDownError),
):
await coresys.backups.do_backup_partial("test", homeassistant=True)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern", "mount_propagation")
async def test_load_network_error(
coresys: CoreSys, caplog: pytest.LogCaptureFixture, mock_is_mount: MagicMock
):
"""Test load of backup manager when there is a network error."""
(coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
caplog.clear()
# This should not raise, manager should just ignore backup locations with errors
mock_is_mount.return_value = False
mock_path = MagicMock()
mock_path.is_dir.side_effect = OSError("Host is down")
mock_path.as_posix.return_value = "/data/backup_test"
with patch.object(Mount, "local_where", new=PropertyMock(return_value=mock_path)):
await coresys.backups.load()
assert "Could not list backups from /data/backup_test" in caplog.text
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_backup_with_healthcheck(
coresys: CoreSys, install_app_ssh: App, container: DockerContainer
):
"""Test backup of app with healthcheck in cold mode."""
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
container.show.return_value["Config"] = {"Healthcheck": "exists"}
install_app_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_app_ssh.load()
await asyncio.sleep(0)
assert install_app_ssh.state == AppState.STARTUP
state_changes: list[AppState] = []
_container_events_task: asyncio.Task | None = None
async def container_events():
nonlocal state_changes
await asyncio.sleep(0)
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.STOPPED,
id="abc123",
time=1,
)
)
state_changes.append(install_app_ssh.state)
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.RUNNING,
id="abc123",
time=1,
)
)
state_changes.append(install_app_ssh.state)
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.HEALTHY,
id="abc123",
time=1,
)
)
async def container_events_task(*args, **kwargs):
nonlocal _container_events_task
_container_events_task = asyncio.create_task(container_events())
with (
patch.object(DockerApp, "run", new=container_events_task),
patch.object(
AppModel,
"backup_mode",
new=PropertyMock(return_value=AppBackupMode.COLD),
),
patch.object(DockerApp, "is_running", side_effect=[True, False, False]),
):
backup = await coresys.backups.do_backup_partial(
homeassistant=False, apps=["local_ssh"]
)
assert backup
assert state_changes == [AppState.STOPPED, AppState.STARTUP]
assert install_app_ssh.state == AppState.STARTED
assert coresys.core.state == CoreState.RUNNING
await _container_events_task
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_with_healthcheck(
coresys: CoreSys, install_app_ssh: App, container: DockerContainer
):
"""Test backup of app with healthcheck in cold mode."""
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
container.show.return_value["Config"] = {"Healthcheck": "exists"}
install_app_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_app_ssh.load()
await asyncio.sleep(0)
assert install_app_ssh.state == AppState.STARTUP
backup = await coresys.backups.do_backup_partial(
homeassistant=False, apps=["local_ssh"]
)
state_changes: list[AppState] = []
_container_events_task: asyncio.Task | None = None
async def container_events():
nonlocal state_changes
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.STOPPED,
id="abc123",
time=1,
)
)
state_changes.append(install_app_ssh.state)
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.RUNNING,
id="abc123",
time=1,
)
)
state_changes.append(install_app_ssh.state)
await install_app_ssh.container_state_changed(
DockerContainerStateEvent(
name=f"addon_{TEST_ADDON_SLUG}",
state=ContainerState.HEALTHY,
id="abc123",
time=1,
)
)
async def container_events_task(*args, **kwargs):
nonlocal _container_events_task
_container_events_task = asyncio.create_task(container_events())
with (
patch.object(DockerApp, "run", new=container_events_task),
patch.object(DockerApp, "is_running", return_value=False),
patch.object(AppModel, "_validate_availability"),
patch.object(App, "with_ingress", new=PropertyMock(return_value=False)),
):
await coresys.backups.do_restore_partial(backup, apps=["local_ssh"])
assert state_changes == [AppState.STOPPED, AppState.STARTUP]
assert install_app_ssh.state == AppState.STARTED
assert coresys.core.state == CoreState.RUNNING
await _container_events_task
def _make_backup_message_for_assert(
*,
action: str = "full_backup",
reference: str,
stage: str | None,
done: bool = False,
progress: float = 0.0,
):
"""Make a backup message to use for assert test."""
return {
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": f"backup_manager_{action}",
"reference": reference,
"uuid": ANY,
"progress": progress,
"stage": stage,
"done": done,
"parent_id": None,
"errors": [],
"created": ANY,
"extra": None,
},
},
}
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_backup_progress(
coresys: CoreSys,
install_app_ssh: App,
container: DockerContainer,
ha_ws_client: AsyncMock,
):
"""Test progress is tracked during backups."""
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
install_app_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with (
patch.object(
AppModel,
"backup_mode",
new=PropertyMock(return_value=AppBackupMode.COLD),
),
patch("supervisor.apps.app.asyncio.Event.wait"),
):
full_backup: Backup = await coresys.backups.do_backup_full()
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] == WSType.SUPERVISOR_EVENT
and call.args[0]["data"].get("data", {}).get("name")
== "backup_manager_full_backup"
]
assert messages == [
_make_backup_message_for_assert(reference=None, stage=None),
_make_backup_message_for_assert(reference=full_backup.slug, stage=None),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="addon_repositories"
),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="home_assistant"
),
_make_backup_message_for_assert(reference=full_backup.slug, stage="addons"),
_make_backup_message_for_assert(reference=full_backup.slug, stage="folders"),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="supervisor_config"
),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="finishing_file"
),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="await_addon_restarts"
),
_make_backup_message_for_assert(
reference=full_backup.slug,
stage="await_addon_restarts",
done=True,
progress=100,
),
]
ha_ws_client.async_send_command.reset_mock()
partial_backup: Backup = await coresys.backups.do_backup_partial(
apps=["local_ssh"], folders=["media", "share", "ssl"]
)
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] == WSType.SUPERVISOR_EVENT
and call.args[0]["data"].get("data", {}).get("name")
== "backup_manager_partial_backup"
]
assert messages == [
_make_backup_message_for_assert(
action="partial_backup", reference=None, stage=None
),
_make_backup_message_for_assert(
action="partial_backup", reference=partial_backup.slug, stage=None
),
_make_backup_message_for_assert(
action="partial_backup",
reference=partial_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
action="partial_backup", reference=partial_backup.slug, stage="addons"
),
_make_backup_message_for_assert(
action="partial_backup", reference=partial_backup.slug, stage="folders"
),
_make_backup_message_for_assert(
action="partial_backup",
reference=partial_backup.slug,
stage="supervisor_config",
),
_make_backup_message_for_assert(
action="partial_backup",
reference=partial_backup.slug,
stage="finishing_file",
),
_make_backup_message_for_assert(
action="partial_backup",
reference=partial_backup.slug,
stage="finishing_file",
done=True,
progress=100,
),
]
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_progress(
coresys: CoreSys,
install_app_ssh: App,
container: DockerContainer,
ha_ws_client: AsyncMock,
):
"""Test progress is tracked during backups."""
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
install_app_ssh.path_data.mkdir()
install_app_ssh.state = AppState.STARTED
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
full_backup: Backup = await coresys.backups.do_backup_full()
await asyncio.sleep(0)
ha_ws_client.async_send_command.reset_mock()
# Install another app to be uninstalled
# Duplicate code from install_addon_example fixture
# Apparently request.getfixturevalue does not work with async fixtures: https://github.com/pytest-dev/pytest-asyncio/issues/112
store = coresys.apps.store["local_example"]
await coresys.apps.data.install(store)
# pylint: disable-next=protected-access
coresys.apps.data._data = coresys.apps.data._schema(coresys.apps.data._data)
coresys.apps.local[store.slug] = App(coresys, store.slug)
with (
patch("supervisor.apps.app.asyncio.Event.wait"),
patch.object(HomeAssistant, "restore"),
patch.object(HomeAssistantCore, "update"),
patch.object(AppModel, "_validate_availability"),
patch.object(AppModel, "with_ingress", new=PropertyMock(return_value=False)),
):
await coresys.backups.do_restore_full(full_backup)
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] == WSType.SUPERVISOR_EVENT
and call.args[0]["data"].get("data", {}).get("name")
== "backup_manager_full_restore"
]
assert messages == [
_make_backup_message_for_assert(
action="full_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
action="full_restore", reference=full_backup.slug, stage=None
),
_make_backup_message_for_assert(
action="full_restore", reference=full_backup.slug, stage="folders"
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="home_assistant",
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="remove_delta_addons",
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
action="full_restore", reference=full_backup.slug, stage="addons"
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="supervisor_config",
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="await_addon_restarts",
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="await_home_assistant_restart",
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="await_home_assistant_restart",
done=True,
progress=100,
),
]
folders_backup: Backup = await coresys.backups.do_backup_partial(
folders=["media", "share", "ssl"]
)
ha_ws_client.async_send_command.reset_mock()
await coresys.backups.do_restore_partial(
folders_backup, folders=["media", "share", "ssl"]
)
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] == WSType.SUPERVISOR_EVENT
and call.args[0]["data"].get("data", {}).get("name")
== "backup_manager_partial_restore"
]
assert messages == [
_make_backup_message_for_assert(
action="partial_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
action="partial_restore",
reference=folders_backup.slug,
stage=None,
),
_make_backup_message_for_assert(
action="partial_restore",
reference=folders_backup.slug,
stage="folders",
),
_make_backup_message_for_assert(
action="partial_restore",
reference=folders_backup.slug,
stage="supervisor_config",
),
_make_backup_message_for_assert(
action="partial_restore",
reference=folders_backup.slug,
stage="supervisor_config",
done=True,
progress=100,
),
]
container.show.return_value["State"]["Status"] = "stopped"
container.show.return_value["State"]["Running"] = False
install_app_ssh.state = AppState.STOPPED
app_backup: Backup = await coresys.backups.do_backup_partial(apps=["local_ssh"])
ha_ws_client.async_send_command.reset_mock()
with (
patch.object(AppModel, "_validate_availability"),
patch.object(HomeAssistantCore, "start"),
):
await coresys.backups.do_restore_partial(app_backup, apps=["local_ssh"])
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] == WSType.SUPERVISOR_EVENT
and call.args[0]["data"].get("data", {}).get("name")
== "backup_manager_partial_restore"
]
assert messages == [
_make_backup_message_for_assert(
action="partial_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
action="partial_restore",
reference=app_backup.slug,
stage=None,
),
_make_backup_message_for_assert(
action="partial_restore",
reference=app_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
action="partial_restore",
reference=app_backup.slug,
stage="addons",
),
_make_backup_message_for_assert(
action="partial_restore",
reference=app_backup.slug,
stage="supervisor_config",
),
_make_backup_message_for_assert(
action="partial_restore",
reference=app_backup.slug,
stage="supervisor_config",
done=True,
progress=100,
),
]
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_freeze_thaw(
coresys: CoreSys,
install_app_ssh: App,
container: DockerContainer,
ha_ws_client: AsyncMock,
):
"""Test manual freeze and thaw for external snapshots."""
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
install_app_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
with (
patch.object(
AppModel, "backup_pre", new=PropertyMock(return_value="pre_backup")
),
patch.object(
AppModel, "backup_post", new=PropertyMock(return_value="post_backup")
),
):
# Run the freeze
await coresys.backups.freeze_all()
container.exec.assert_called_once_with("pre_backup")
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] in ["backup/start", "backup/end"]
or call.args[0]["data"].get("data", {}).get("name")
in ["backup_manager_freeze_all", "backup_manager_thaw_all"]
]
assert messages == [
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage=None
),
{"type": "backup/start"},
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="home_assistant"
),
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="addons"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage=None
),
_make_backup_message_for_assert(
action="freeze_all",
reference=None,
stage="addons",
done=True,
progress=100,
),
]
# Release the thaw task
container.exec.reset_mock()
ha_ws_client.async_send_command.reset_mock()
await coresys.backups.thaw_all()
container.exec.assert_called_once_with("post_backup")
assert coresys.core.state == CoreState.RUNNING
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] in ["backup/start", "backup/end"]
or call.args[0]["data"].get("data", {}).get("name")
in ["backup_manager_freeze_all", "backup_manager_thaw_all"]
]
assert messages == [
{"type": "backup/end"},
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="home_assistant"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="addons"
),
_make_backup_message_for_assert(
action="thaw_all",
reference=None,
stage="addons",
done=True,
progress=100,
),
]
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_freeze_thaw_timeout(
coresys: CoreSys, ha_ws_client: AsyncMock, caplog: pytest.LogCaptureFixture
):
"""Test manual freeze ends due to timeout expiration."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
await coresys.backups.freeze_all(timeout=0.01)
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/start"}
for call in ha_ws_client.async_send_command.call_args_list
)
ha_ws_client.async_send_command.reset_mock()
await asyncio.sleep(0.02)
assert coresys.core.state == CoreState.RUNNING
assert any(
call.args[0] == {"type": "backup/end"}
for call in ha_ws_client.async_send_command.call_args_list
)
assert "Timeout waiting for signal to thaw after manual freeze" in caplog.text
async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys):
"""Test thaw_all cannot be used unless freeze was started by freeze_all method."""
await coresys.core.set_state(CoreState.FREEZE)
with pytest.raises(BackupError):
await coresys.backups.thaw_all()
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_only_reloads_ingress_on_change(
coresys: CoreSys, install_app_ssh: App
):
"""Test restore only tells core to reload ingress when something has changed."""
install_app_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_no_ingress: Backup = await coresys.backups.do_backup_partial(
apps=["local_ssh"]
)
install_app_ssh.ingress_panel = True
await install_app_ssh.save_persist()
backup_with_ingress: Backup = await coresys.backups.do_backup_partial(
apps=["local_ssh"]
)
async def mock_is_running(*_) -> bool:
return True
with (
patch.object(HomeAssistantCore, "is_running", new=mock_is_running),
patch.object(AppModel, "_validate_availability"),
patch.object(DockerApp, "attach"),
patch.object(HomeAssistantAPI, "make_request") as make_request,
):
make_request.return_value.__aenter__.return_value.status = 200
# Has ingress before and after - not called
await coresys.backups.do_restore_partial(
backup_with_ingress, apps=["local_ssh"]
)
make_request.assert_not_called()
# Restore removes ingress - tell Home Assistant
await coresys.backups.do_restore_partial(backup_no_ingress, apps=["local_ssh"])
make_request.assert_called_once_with(
"delete", "api/hassio_push/panel/local_ssh"
)
# No ingress before or after - not called
make_request.reset_mock()
await coresys.backups.do_restore_partial(backup_no_ingress, apps=["local_ssh"])
make_request.assert_not_called()
# Restore adds ingress - tell Home Assistant
await coresys.backups.do_restore_partial(
backup_with_ingress, apps=["local_ssh"]
)
make_request.assert_called_once_with("post", "api/hassio_push/panel/local_ssh")
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_new_app(coresys: CoreSys, install_app_example: App):
"""Test restore installing new app."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
assert not install_app_example.path_data.exists()
assert not install_app_example.path_config.exists()
backup: Backup = await coresys.backups.do_backup_partial(apps=["local_example"])
await coresys.apps.uninstall("local_example")
assert "local_example" not in coresys.apps.local
with (
patch.object(AppModel, "_validate_availability"),
patch.object(DockerApp, "attach"),
):
assert await coresys.backups.do_restore_partial(backup, apps=["local_example"])
assert "local_example" in coresys.apps.local
assert install_app_example.path_data.exists()
assert install_app_example.path_config.exists()
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_preserves_data_config(
coresys: CoreSys, install_app_example: App
):
"""Test restore preserves data and config."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_app_example.path_data.mkdir()
(test_data := install_app_example.path_data / "data.txt").touch()
install_app_example.path_config.mkdir()
(test_config := install_app_example.path_config / "config.yaml").touch()
backup: Backup = await coresys.backups.do_backup_partial(apps=["local_example"])
(test_config2 := install_app_example.path_config / "config2.yaml").touch()
await coresys.apps.uninstall("local_example")
assert not install_app_example.path_data.exists()
assert install_app_example.path_config.exists()
assert test_config2.exists()
with (
patch.object(AppModel, "_validate_availability"),
patch.object(DockerApp, "attach"),
):
assert await coresys.backups.do_restore_partial(backup, apps=["local_example"])
assert test_data.exists()
assert test_config.exists()
assert not test_config2.exists()
@pytest.mark.usefixtures(
"tmp_supervisor_data", "path_extern", "mount_propagation", "mock_is_mount"
)
async def test_backup_to_mount_bypasses_free_space_condition(
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock]
):
"""Test backing up to a mount bypasses the check on local free space."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda _: 0.1
# These fail due to lack of local free space
with pytest.raises(BackupJobError):
await coresys.backups.do_backup_full()
with pytest.raises(BackupJobError):
await coresys.backups.do_backup_partial(folders=["media"])
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.response_get_unit = [
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
# Add a backup mount
await coresys.mounts.load()
await coresys.mounts.create_mount(
Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
)
mount = coresys.mounts.get("backup_test")
# These succeed because local free space does not matter when using a mount
await coresys.backups.do_backup_full(location=mount)
await coresys.backups.do_backup_partial(folders=["media"], location=mount)
@pytest.mark.parametrize(
"partial_backup,exclude_db_setting",
[(False, True), (True, True), (False, False), (True, False)],
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_skip_homeassistant_database(
coresys: CoreSys, partial_backup: bool, exclude_db_setting: bool | None
):
"""Test exclude database option skips database in backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.jobs.ignore_conditions = [
JobCondition.INTERNET_HOST,
JobCondition.INTERNET_SYSTEM,
]
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
coresys.homeassistant.backups_exclude_database = exclude_db_setting
test_file = coresys.config.path_homeassistant / "configuration.yaml"
test_db = coresys.config.path_homeassistant / "home-assistant_v2.db"
test_db_wal = coresys.config.path_homeassistant / "home-assistant_v2.db-wal"
test_db_shm = coresys.config.path_homeassistant / "home-assistant_v2.db-shm"
def setup_1():
test_db.touch()
test_db_wal.touch()
test_db_shm.touch()
write_json_file(test_file, {"default_config": {}})
await coresys.run_in_executor(setup_1)
kwargs = {} if exclude_db_setting else {"homeassistant_exclude_database": True}
if partial_backup:
backup: Backup = await coresys.backups.do_backup_partial(
homeassistant=True, **kwargs
)
else:
backup: Backup = await coresys.backups.do_backup_full(**kwargs)
def setup_2():
test_file.unlink()
write_json_file(test_db, {"hello": "world"})
write_json_file(test_db_wal, {"hello": "world"})
await coresys.run_in_executor(setup_2)
with (
patch.object(HomeAssistantCore, "update"),
patch.object(HomeAssistantCore, "start"),
):
await coresys.backups.do_restore_partial(backup, homeassistant=True)
def test_assertions():
assert read_json_file(test_file) == {"default_config": {}}
assert read_json_file(test_db) == {"hello": "world"}
assert read_json_file(test_db_wal) == {"hello": "world"}
assert not test_db_shm.exists()
await coresys.run_in_executor(test_assertions)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
@pytest.mark.parametrize(
("backup_locations", "location_name", "healthy_expected"),
[
(["test"], "test", True),
([None], None, False),
],
indirect=["backup_locations"],
)
async def test_backup_remove_error(
coresys: CoreSys,
backup_locations: list[LOCATION_TYPE],
location_name: str | None,
healthy_expected: bool,
):
"""Test removing a backup error."""
location: LOCATION_TYPE = backup_locations[0]
backup_base_path = coresys.backups._get_base_path(location) # pylint: disable=protected-access
backup_base_path.mkdir(exist_ok=True)
copy(get_fixture_path("backup_example.tar"), backup_base_path)
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert location_name in backup.all_locations
backup.all_locations[location_name].path = (tar_file_mock := MagicMock())
tar_file_mock.unlink.side_effect = (err := OSError())
err.errno = errno.EBUSY
with pytest.raises(BackupError):
await coresys.backups.remove(backup)
assert coresys.core.healthy is True
err.errno = errno.EBADMSG
with pytest.raises(BackupError):
await coresys.backups.remove(backup)
assert coresys.core.healthy is healthy_expected
@pytest.mark.parametrize(
"error_path,healthy_expected",
[(Path("/data/backup"), False), (Path("/data/mounts/backup_test"), True)],
)
@pytest.mark.usefixtures("path_extern", "mount_propagation")
async def test_reload_error(
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
error_path: Path,
healthy_expected: bool,
mock_is_mount: MagicMock,
):
"""Test error during reload."""
err = OSError()
def mock_is_dir(path: Path) -> bool:
"""Mock of is_dir."""
if path == error_path:
raise err
return True
# Add a backup mount
await coresys.mounts.load()
await coresys.mounts.create_mount(
Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
)
mock_is_mount.return_value = False
with (
patch("supervisor.backups.manager.Path.is_dir", new=mock_is_dir),
patch("supervisor.backups.manager.Path.glob", return_value=[]),
):
err.errno = errno.EBUSY
await coresys.backups.reload()
assert "Could not list backups" in caplog.text
assert coresys.core.healthy is True
caplog.clear()
err.errno = errno.EBADMSG
await coresys.backups.reload()
assert "Could not list backups" in caplog.text
assert coresys.core.healthy is healthy_expected
@pytest.mark.parametrize(
"pre_backup_error",
[
{
"code": "pre_backup_actions_failed",
"message": "Database migration in progress",
},
{"code": "unknown_command", "message": "Unknown command."},
],
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_core_pre_backup_actions_failed(
coresys: CoreSys,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
pre_backup_error: dict[str, str],
):
"""Test pre-backup actions failed in HA core stops backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2024.7.0")
ha_ws_client.async_send_command.return_value = {
"error": pre_backup_error,
"id": 1,
"success": False,
"type": "result",
}
assert not await coresys.backups.do_backup_full()
assert (
f"Preparing backup of Home Assistant Core failed due to: {pre_backup_error['message']}"
in caplog.text
)
@pytest.mark.usefixtures("mount_propagation", "mock_is_mount", "path_extern")
async def test_reload_multiple_locations(coresys: CoreSys, tmp_supervisor_data: Path):
"""Test reload with a backup that exists in multiple locations."""
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
assert not coresys.backups.list_backups
backup_file = get_fixture_path("backup_example.tar")
copy(backup_file, tmp_supervisor_data / "core/backup")
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location == ".cloud_backup"
assert backup.locations == [".cloud_backup"]
assert backup.all_locations.keys() == {".cloud_backup"}
copy(backup_file, tmp_supervisor_data / "backup")
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location is None
assert backup.locations == [None]
assert backup.all_locations.keys() == {".cloud_backup", None}
copy(backup_file, mount_dir)
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location in {None, "backup_test"}
assert None in backup.locations
assert "backup_test" in backup.locations
assert backup.all_locations.keys() == {".cloud_backup", None, "backup_test"}
@pytest.mark.usefixtures("mount_propagation", "mock_is_mount", "path_extern")
async def test_partial_reload_multiple_locations(
coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test a partial reload with a backup that exists in multiple locations."""
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"usage": "backup",
"type": "cifs",
"server": "test.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
assert not coresys.backups.list_backups
backup_file = get_fixture_path("backup_example.tar")
copy(backup_file, tmp_supervisor_data / "core/backup")
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location == ".cloud_backup"
assert backup.locations == [".cloud_backup"]
assert backup.all_locations.keys() == {".cloud_backup"}
copy(backup_file, tmp_supervisor_data / "backup")
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location is None
assert backup.locations == [None]
assert backup.all_locations.keys() == {".cloud_backup", None}
copy(backup_file, mount_dir)
await coresys.backups.reload()
assert coresys.backups.list_backups
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.location is None
assert backup.locations == [None, "backup_test"]
assert backup.all_locations.keys() == {".cloud_backup", None, "backup_test"}
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_backup_remove_multiple_locations(coresys: CoreSys):
"""Test removing a backup that exists in multiple locations."""
backup_file = get_fixture_path("backup_example.tar")
location_1 = Path(copy(backup_file, coresys.config.path_backup))
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
}
await coresys.backups.remove(backup)
assert not location_1.exists()
assert not location_2.exists()
assert not coresys.backups.get("7fed74c8")
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
"""Test removing a backup that exists in multiple locations from one location."""
backup_file = get_fixture_path("backup_example.tar")
location_1 = Path(copy(backup_file, coresys.config.path_backup))
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
".cloud_backup": BackupLocation(
path=location_2, protected=False, size_bytes=10240
),
}
await coresys.backups.remove(backup, locations=[".cloud_backup"])
assert location_1.exists()
assert not location_2.exists()
assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {
None: BackupLocation(path=location_1, protected=False, size_bytes=10240),
}
@pytest.mark.usefixtures("tmp_supervisor_data", "supervisor_internet")
async def test_app_backup_excludes(coresys: CoreSys, install_app_example: App):
"""Test backup excludes option for apps."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_app_example.path_data.mkdir(parents=True)
(test1 := install_app_example.path_data / "test1").touch()
(test_dir := install_app_example.path_data / "test_dir").mkdir()
(test2 := test_dir / "test2").touch()
(test3 := test_dir / "test3").touch()
install_app_example.data["backup_exclude"] = ["test1", "*/test2"]
backup = await coresys.backups.do_backup_partial(apps=["local_example"])
test1.unlink()
test2.unlink()
test3.unlink()
test_dir.rmdir()
await coresys.backups.do_restore_partial(backup, apps=["local_example"])
assert not test1.exists()
assert not test2.exists()
assert test_dir.is_dir()
assert test3.exists()
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_remove_non_existing_backup_raises(coresys: CoreSys):
"""Test removing a backup error."""
location: LOCATION_TYPE = None
backup_base_path = coresys.backups._get_base_path(location) # pylint: disable=protected-access
backup_base_path.mkdir(exist_ok=True)
copy(get_fixture_path("backup_example.tar"), backup_base_path)
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert None in backup.all_locations
backup.all_locations[None].path = (tar_file_mock := MagicMock())
tar_file_mock.unlink.side_effect = (err := FileNotFoundError())
err.errno = errno.ENOENT
with pytest.raises(BackupFileNotFoundError):
await coresys.backups.remove(backup)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
@pytest.mark.parametrize(
("error_num", "unhealthy", "default_location", "additional_location"),
[
(errno.EBUSY, False, None, ".cloud_backup"),
(errno.EBUSY, False, ".cloud_backup", None),
(errno.EBADMSG, True, None, ".cloud_backup"),
(errno.EBADMSG, True, ".cloud_backup", None),
],
)
async def test_backup_multiple_locations_oserror(
coresys: CoreSys,
error_num: int,
unhealthy: bool,
default_location: str | None,
additional_location: str | None,
):
"""Test backup to multiple locations raises oserror."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch("supervisor.backups.manager.copy", side_effect=(err := OSError())):
err.errno = error_num
backup: Backup | None = await coresys.backups.do_backup_full(
name="test",
location=default_location,
additional_locations=[additional_location],
)
assert backup
assert coresys.backups.get(backup.slug) == backup
assert backup.location == default_location
assert additional_location not in backup.all_locations
assert (
UnhealthyReason.OSERROR_BAD_MESSAGE in coresys.resolution.unhealthy
) is unhealthy
@pytest.mark.parametrize("same_mount", [True, False])
async def test_get_upload_path_for_backup_location(coresys: CoreSys, same_mount: bool):
"""Test get_upload_path_for_location with local backup location."""
manager = BackupManager(coresys)
target_path = coresys.config.path_backup
tmp_path = coresys.config.path_tmp
def make_stat_mock(target_path: Path, tmp_path: Path, same_mount: bool):
def _mock_stat(self):
if self == target_path:
return SimpleNamespace(st_dev=1)
if self == tmp_path:
return SimpleNamespace(st_dev=1 if same_mount else 2)
raise ValueError(f"Unexpected path: {self}")
return _mock_stat
with patch(
"pathlib.Path.stat", new=make_stat_mock(target_path, tmp_path, same_mount)
):
result = await manager.get_upload_path_for_location(None)
if same_mount:
assert result == tmp_path
else:
assert result == target_path
@pytest.mark.usefixtures(
"tmp_supervisor_data", "path_extern", "mount_propagation", "mock_is_mount"
)
async def test_get_upload_path_for_mount_location(coresys: CoreSys):
"""Test get_upload_path_for_location with a Mount location."""
manager = BackupManager(coresys)
await coresys.mounts.load()
mount = Mount.from_dict(
coresys,
{
"name": "test_mount",
"usage": "backup",
"type": "cifs",
"server": "server.local",
"share": "test",
},
)
await coresys.mounts.create_mount(mount)
result = await manager.get_upload_path_for_location(mount)
assert result == mount.local_where
@pytest.mark.usefixtures(
"supervisor_internet", "tmp_supervisor_data", "path_extern", "install_app_example"
)
async def test_backup_app_skips_uninstalled(
coresys: CoreSys, caplog: pytest.LogCaptureFixture
):
"""Test restore installing new app."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
assert "local_example" in coresys.apps.local
orig_store_apps = Backup.store_apps
async def mock_store_apps(*args, **kwargs):
# Mock an uninstall during the backup process
await coresys.apps.uninstall("local_example")
await orig_store_apps(*args, **kwargs)
with patch.object(Backup, "store_apps", new=mock_store_apps):
backup: Backup = await coresys.backups.do_backup_partial(
apps=["local_example"], folders=["ssl"]
)
assert "local_example" not in coresys.apps.local
assert not backup.apps
assert (
"Skipping backup of app local_example because it has been uninstalled"
in caplog.text
)