1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-02-15 07:27:13 +00:00

Mount status checks look at connection (#4882)

* Mount status checks look at connection

* Fix tests and refactor to fixture

* Fix test
This commit is contained in:
Mike Degatano
2024-02-12 11:32:54 -05:00
committed by GitHub
parent 8e71d69a64
commit b5bf270d22
12 changed files with 319 additions and 102 deletions

View File

@@ -52,7 +52,7 @@ SHARE_TEST_DATA = {
@pytest.fixture(name="mount")
async def fixture_mount(
coresys: CoreSys, tmp_supervisor_data, path_extern, mount_propagation
coresys: CoreSys, tmp_supervisor_data, path_extern, mount_propagation, mock_is_mount
) -> Mount:
"""Add an initial mount and load mounts."""
mount = Mount.from_dict(coresys, MEDIA_TEST_DATA)
@@ -328,6 +328,7 @@ async def test_create_mount(
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating a mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -459,7 +460,11 @@ async def test_remove_reload_mount_missing(coresys: CoreSys, mount_propagation):
async def test_save_data(
coresys: CoreSys, tmp_supervisor_data: Path, path_extern, mount_propagation
coresys: CoreSys,
tmp_supervisor_data: Path,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test saving mount config data."""
# Replace mount manager with one that doesn't have save_data mocked
@@ -639,6 +644,7 @@ async def test_create_share_mount(
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating a share mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]

View File

@@ -1,18 +1,19 @@
"""Tests for mounts."""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import stat
from typing import Any
from unittest.mock import patch
from unittest.mock import MagicMock
from dbus_fast import DBusError, ErrorType, Variant
import pytest
from supervisor.coresys import CoreSys
from supervisor.dbus.const import UnitActiveState
from supervisor.exceptions import MountError, MountInvalidError
from supervisor.exceptions import MountActivationError, MountError, MountInvalidError
from supervisor.mounts.const import MountCifsVersion, MountType, MountUsage
from supervisor.mounts.mount import CIFSMount, Mount, NFSMount
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
@@ -49,6 +50,7 @@ async def test_cifs_mount(
path_extern,
additional_data: dict[str, Any],
expected_options: list[str],
mock_is_mount,
):
"""Test CIFS mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -143,6 +145,7 @@ async def test_cifs_mount_read_only(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data: Path,
path_extern,
mock_is_mount,
):
"""Test a read-only cifs mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -188,6 +191,7 @@ async def test_nfs_mount(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data: Path,
path_extern,
mock_is_mount,
):
"""Test NFS mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -247,6 +251,7 @@ async def test_nfs_mount_read_only(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data: Path,
path_extern,
mock_is_mount,
):
"""Test NFS mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -292,6 +297,7 @@ async def test_load(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data,
path_extern,
mock_is_mount,
):
"""Test mount loading."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -365,13 +371,12 @@ async def test_load(
systemd_unit_service.active_state = "activating"
mount = Mount.from_dict(coresys, mount_data)
async def mock_activation_finished(*_):
assert mount.state == UnitActiveState.ACTIVATING
assert systemd_service.ReloadOrRestartUnit.calls == []
systemd_unit_service.active_state = ["failed", "active"]
with patch("supervisor.mounts.mount.asyncio.sleep", new=mock_activation_finished):
await mount.load()
load_task = asyncio.create_task(mount.load())
await asyncio.sleep(0.1)
systemd_unit_service.emit_properties_changed({"ActiveState": "failed"})
await asyncio.sleep(0.1)
systemd_unit_service.emit_properties_changed({"ActiveState": "active"})
await load_task
assert mount.state == UnitActiveState.ACTIVE
assert systemd_service.StartTransientUnit.calls == []
@@ -381,7 +386,10 @@ async def test_load(
async def test_unmount(
coresys: CoreSys, all_dbus_services: dict[str, DBusServiceMock], path_extern
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
path_extern,
mock_is_mount,
):
"""Test unmounting."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -418,6 +426,7 @@ async def test_mount_failure(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data,
path_extern,
mock_is_mount,
):
"""Test failure to mount."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -461,18 +470,15 @@ async def test_mount_failure(
systemd_service.GetUnit.calls.clear()
systemd_unit_service.active_state = "activating"
async def mock_activation_finished(*_):
assert mount.state == UnitActiveState.ACTIVATING
systemd_unit_service.active_state = "failed"
with patch(
"supervisor.mounts.mount.asyncio.sleep", new=mock_activation_finished
), pytest.raises(MountError):
await mount.mount()
load_task = asyncio.create_task(mount.mount())
await asyncio.sleep(0.1)
systemd_unit_service.emit_properties_changed({"ActiveState": "failed"})
with pytest.raises(MountError):
await load_task
assert mount.state == UnitActiveState.FAILED
assert len(systemd_service.StartTransientUnit.calls) == 1
assert len(systemd_service.GetUnit.calls) == 2
assert len(systemd_service.GetUnit.calls) == 1
async def test_unmount_failure(
@@ -500,11 +506,11 @@ async def test_unmount_failure(
assert len(systemd_service.StopUnit.calls) == 1
# If error is NoSuchUnit then ignore, it has already been unmounted
# If unit is missing we skip unmounting, its already gone
systemd_service.StopUnit.calls.clear()
systemd_service.response_stop_unit = ERROR_NO_UNIT
systemd_service.response_get_unit = ERROR_NO_UNIT
await mount.unmount()
assert len(systemd_service.StopUnit.calls) == 1
assert systemd_service.StopUnit.calls == []
async def test_reload_failure(
@@ -512,6 +518,7 @@ async def test_reload_failure(
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data,
path_extern,
mock_is_mount,
):
"""Test failure to reload."""
systemd_service: SystemdService = all_dbus_services["systemd"]
@@ -610,7 +617,7 @@ async def test_mount_local_where_invalid(
assert systemd_service.StartTransientUnit.calls == []
async def test_update_clears_issue(coresys: CoreSys, path_extern):
async def test_update_clears_issue(coresys: CoreSys, path_extern, mock_is_mount):
"""Test updating mount data clears corresponding failed mount issue if active."""
mount = Mount.from_dict(
coresys,
@@ -635,8 +642,88 @@ async def test_update_clears_issue(coresys: CoreSys, path_extern):
assert mount.failed_issue in coresys.resolution.issues
assert len(coresys.resolution.suggestions_for_issue(mount.failed_issue)) == 2
await mount.update()
assert await mount.update() is True
assert mount.state == UnitActiveState.ACTIVE
assert mount.failed_issue not in coresys.resolution.issues
assert not coresys.resolution.suggestions_for_issue(mount.failed_issue)
async def test_update_leaves_issue_if_down(
coresys: CoreSys, mock_is_mount: MagicMock, path_extern
):
"""Test issue is left if system is down after update (is_mount is false)."""
mount = Mount.from_dict(
coresys,
{
"name": "test",
"usage": "media",
"type": "cifs",
"server": "test.local",
"share": "share",
},
)
assert mount.failed_issue not in coresys.resolution.issues
coresys.resolution.create_issue(
IssueType.MOUNT_FAILED,
ContextType.MOUNT,
reference="test",
suggestions=[SuggestionType.EXECUTE_RELOAD, SuggestionType.EXECUTE_REMOVE],
)
assert mount.failed_issue in coresys.resolution.issues
assert len(coresys.resolution.suggestions_for_issue(mount.failed_issue)) == 2
mock_is_mount.return_value = False
assert (await mount.update()) is False
assert mount.state == UnitActiveState.ACTIVE
assert mount.failed_issue in coresys.resolution.issues
assert len(coresys.resolution.suggestions_for_issue(mount.failed_issue)) == 2
async def test_mount_fails_if_down(
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data: Path,
mock_is_mount: MagicMock,
path_extern,
):
"""Test mount fails if system is down (is_mount is false)."""
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.StartTransientUnit.calls.clear()
mount_data = {
"name": "test",
"usage": "media",
"type": "nfs",
"server": "test.local",
"path": "/media/camera",
"port": 1234,
"read_only": False,
}
mount: NFSMount = Mount.from_dict(coresys, mount_data)
mock_is_mount.return_value = False
with pytest.raises(MountActivationError):
await mount.mount()
assert mount.state == UnitActiveState.ACTIVE
assert mount.local_where.exists()
assert mount.local_where.is_dir()
assert systemd_service.StartTransientUnit.calls == [
(
"mnt-data-supervisor-mounts-test.mount",
"fail",
[
["Options", Variant("s", "port=1234,soft,timeo=200")],
["Type", Variant("s", "nfs")],
["Description", Variant("s", "Supervisor nfs mount: test")],
["What", Variant("s", "test.local:/media/camera")],
],
[],
)
]