1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-07-03 11:55:38 +01:00
Files
supervisor/tests/api/test_mounts.py
Stefan Agner ed91b18c4b tests: enable flake8-pytest-style (PT) ruff rules (#6857)
* tests: enable flake8-pytest-style (PT) ruff rules

Enable the `PT` ruff rule set and fix the resulting violations across the
test suite:

- PT006: pass parametrize argument names as tuples instead of a single
  comma-separated string.
- PT022: switch fixtures that have no teardown from `yield` to `return`
  so the lack of cleanup is obvious at a glance.
- PT011: add `match=` to broad `pytest.raises(ValueError)` blocks so the
  expected error is anchored to a specific message.
- PT012: hoist setup (patches, branching) out of `pytest.raises()`
  blocks so only the call that is expected to raise remains inside.
- PT013: replace `from pytest import X` with `import pytest` and access
  attributes via the module.
- PT015: replace `try/except` + `assert False` patterns with
  `pytest.raises(...)`.
- PT017: replace `assert` on exceptions inside `except` blocks with
  `pytest.raises(...) as exc_info` and assert on `exc_info.value`.

No behavioral changes to the tests; the full suite still passes.

* tests: address review feedback on PT ruff rule enablement

- Fix fixture return-type annotations after switching `yield` to `return`
  in tests/conftest.py: drop the `Generator[...]`/`AsyncGenerator[...]`
  wrapper for `dns_manager_service`, `supervisor_internet`, `websession`,
  and `mock_update_data` so the annotation matches what the fixture
  actually returns.
- Correct the return-type annotation of `fixture_ip6config_service` from
  `IP4ConfigService` to `IP6ConfigService`.
- Fix recurring "excepiton" typo in tests/utils/test_exception_helper.py.

* tests: verify backup cleanup on permission error

After `test_new_backup_permission_error` raises `BackupPermissionError`,
assert that no tarfile was left behind and `tmp_path` is empty. The
previous version only checked that the exception was raised, which
missed any regression where a partial tarfile would survive the failed
create.

* tests: rename DNS_GOOD_V6 to DNS_V6_UNSUPPORTED

The constant was named "good" but its tests assert that the URLs are
rejected by the DNS validator. The IPv6 URLs are well-formed but
currently rejected because IPv6 doesn't work with the Docker network
(see `dns_url` in supervisor/validate.py). Rename the constant and the
related test to make the intent obvious.
2026-05-20 22:17:54 +02:00

917 lines
28 KiB
Python

"""Test mounts API."""
import asyncio
import errno
from unittest.mock import patch
from aiohttp.test_utils import TestClient
from dbus_fast import DBusError, ErrorType
import pytest
from supervisor.backups.manager import BackupManager
from supervisor.coresys import CoreSys
from supervisor.mounts.mount import Mount
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitService
@pytest.fixture(name="mount")
async def fixture_mount(
coresys: CoreSys, tmp_supervisor_data, path_extern, mount_propagation
) -> Mount:
"""Add an initial mount and load mounts."""
mount = Mount.from_dict(
coresys,
{
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
coresys.mounts._mounts = {"backup_test": mount} # pylint: disable=protected-access
coresys.mounts.default_backup_mount = mount
await coresys.mounts.load()
return mount
async def test_api_mounts_info(api_client_with_prefix: tuple[TestClient, str]):
"""Test mounts info api."""
api_client, prefix = api_client_with_prefix
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == []
async def test_api_create_mount(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating a mount via API."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
"version": "2.0",
},
)
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"version": "2.0",
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
"state": "active",
"read_only": False,
"user_path": None,
}
]
coresys.mounts.save_data.assert_called_once()
async def test_api_create_error_mount_exists(
api_client_with_prefix: tuple[TestClient, str], mount
):
"""Test create mount API errors when mount exists."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert result["message"] == "A mount already exists with name backup_test"
async def test_api_create_dbus_error_mount_not_added(
api_client_with_prefix: tuple[TestClient, str],
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data,
path_extern,
mount_propagation,
):
"""Test mount not added to list of mounts if a dbus error occurs."""
api_client, prefix = api_client_with_prefix
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service.response_get_unit = DBusError(
"org.freedesktop.systemd1.NoSuchUnit", "error"
)
systemd_service.response_start_transient_unit = DBusError(ErrorType.FAILED, "fail")
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert result["message"] == "Could not mount backup_test due to: fail"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == []
systemd_service.response_get_unit = [
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
systemd_service.response_start_transient_unit = "/org/freedesktop/systemd1/job/7623"
systemd_unit_service.active_state = ["failed", "failed", "inactive"]
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "Mounting backup_test did not succeed. Check host logs for errors from mount or systemd unit mnt-data-supervisor-mounts-backup_test.mount for details."
)
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == []
@pytest.mark.parametrize("os_available", ["9.5"], indirect=True)
async def test_api_create_mount_fails_os_out_of_date(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
os_available,
mount_propagation,
):
"""Test creating a mount via API fails when mounting isn't supported due to OS version."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "'MountManager.create_mount' blocked from execution, mounting not supported on system"
)
async def test_api_create_mount_fails_missing_mount_propagation(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
os_available,
):
"""Test creating a mount via API fails when mounting isn't supported due to container config."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "'MountManager.create_mount' blocked from execution, mounting not supported on system"
)
async def test_api_update_mount(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
mount,
mock_is_mount,
):
"""Test updating a mount via API."""
api_client, prefix = api_client_with_prefix
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service.mock_systemd_unit = systemd_unit_service
resp = await api_client.put(
f"{prefix}/mounts/backup_test",
json={
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "new_backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"version": None,
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "new_backups",
"state": "active",
"read_only": False,
"user_path": None,
}
]
coresys.mounts.save_data.assert_called_once()
async def test_api_update_dbus_error_mount_remains(
api_client_with_prefix: tuple[TestClient, str],
all_dbus_services: dict[str, DBusServiceMock],
mount,
tmp_supervisor_data,
path_extern,
mount_propagation,
):
"""Test mount remains in list with unsuccessful state if dbus error occurs during update."""
api_client, prefix = api_client_with_prefix
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_unit_service.active_state = ["failed", "inactive"]
systemd_service.response_get_unit = [
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
]
systemd_service.response_start_transient_unit = DBusError(ErrorType.FAILED, "fail")
resp = await api_client.put(
f"{prefix}/mounts/backup_test",
json={
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups1",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert result["message"] == "Could not mount backup_test due to: fail"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"version": None,
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
"state": None,
"read_only": False,
"user_path": None,
}
]
systemd_service.response_get_unit = [
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
DBusError("org.freedesktop.systemd1.NoSuchUnit", "error"),
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
"/org/freedesktop/systemd1/unit/tmp_2dyellow_2emount",
]
systemd_service.response_start_transient_unit = "/org/freedesktop/systemd1/job/7623"
systemd_unit_service.active_state = [
"failed",
"inactive",
"inactive",
"failed",
"inactive",
]
resp = await api_client.put(
f"{prefix}/mounts/backup_test",
json={
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups2",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "Mounting backup_test did not succeed. Check host logs for errors from mount or systemd unit mnt-data-supervisor-mounts-backup_test.mount for details."
)
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"version": None,
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
"state": None,
"read_only": False,
"user_path": None,
}
]
async def test_api_reload_mount(
api_client_with_prefix: tuple[TestClient, str],
all_dbus_services: dict[str, DBusServiceMock],
mount,
mock_is_mount,
):
"""Test reloading a mount via API."""
api_client, prefix = api_client_with_prefix
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.ReloadOrRestartUnit.calls.clear()
# Healthy mount (probe passes): API reload completes without touching
# systemd — the periodic refresh + probe-as-fast-path means a healthy
# mount only gets reloaded when the share has actually gone bad.
resp = await api_client.post(f"{prefix}/mounts/backup_test/reload")
result = await resp.json()
assert result["result"] == "ok"
assert systemd_service.ReloadOrRestartUnit.calls == []
# Probe failure forces the reload to actually go to systemd.
with patch(
"supervisor.mounts.mount._probe_network_mount",
side_effect=OSError(errno.EHOSTDOWN, "Host is down"),
):
resp = await api_client.post(f"{prefix}/mounts/backup_test/reload")
await resp.json()
assert systemd_service.ReloadOrRestartUnit.calls == [
("mnt-data-supervisor-mounts-backup_test.mount", "fail")
]
async def test_api_delete_mount(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
mount,
):
"""Test deleting a mount via API."""
api_client, prefix = api_client_with_prefix
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service.mock_systemd_unit = systemd_unit_service
resp = await api_client.delete(f"{prefix}/mounts/backup_test")
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == []
coresys.mounts.save_data.assert_called_once()
async def test_api_create_backup_mount_sets_default(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating backup mounts sets default if not set."""
api_client, prefix = api_client_with_prefix
await coresys.mounts.load()
assert coresys.mounts.default_backup_mount is None
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount.name == "backup_test"
# Confirm the default does not change if mount created after its been set
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test_2",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount.name == "backup_test"
async def test_update_backup_mount_changes_default(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
mount,
mock_is_mount,
):
"""Test updating a backup mount may unset the default."""
api_client, prefix = api_client_with_prefix
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.mock_systemd_unit = systemd_unit_service
# Make another backup mount for testing
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "other_backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
# Changing this mount should have no effect on the default
resp = await api_client.put(
f"{prefix}/mounts/other_backup_test",
json={
"type": "cifs",
"usage": "media",
"server": "other-media.local",
"share": "media",
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount.name == "backup_test"
# Changing this one to non-backup should unset the default
resp = await api_client.put(
f"{prefix}/mounts/backup_test",
json={
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media",
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount is None
async def test_delete_backup_mount_changes_default(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
mount,
mock_is_mount,
):
"""Test deleting a backup mount may unset the default."""
api_client, prefix = api_client_with_prefix
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.mock_systemd_unit = systemd_unit_service
# Make another backup mount for testing
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "other_backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
# Deleting this one should have no effect on the default
resp = await api_client.delete(f"{prefix}/mounts/other_backup_test")
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount.name == "backup_test"
# Deleting this current default should unset it
resp = await api_client.delete(f"{prefix}/mounts/backup_test")
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount is None
async def test_backup_mounts_reload_backups(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
all_dbus_services: dict[str, DBusServiceMock],
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test actions on a backup mount reload backups."""
api_client, prefix = api_client_with_prefix
systemd_unit_service: SystemdUnitService = all_dbus_services["systemd_unit"]
systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.mock_systemd_unit = systemd_unit_service
await coresys.mounts.load()
with patch.object(BackupManager, "reload") as reload:
# Only creating a backup mount triggers reload
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "media_test",
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media",
},
)
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_not_called()
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_called_once()
# Only updating a backup mount triggers reload
reload.reset_mock()
resp = await api_client.put(
f"{prefix}/mounts/media_test",
json={
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media2",
},
)
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_not_called()
resp = await api_client.put(
f"{prefix}/mounts/backup_test",
json={
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups2",
},
)
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_called_once()
# Only reloading a backup mount triggers reload
reload.reset_mock()
resp = await api_client.post(f"{prefix}/mounts/media_test/reload")
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_not_called()
resp = await api_client.post(f"{prefix}/mounts/backup_test/reload")
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_called_once()
# Only deleting a backup mount triggers reload
reload.reset_mock()
resp = await api_client.delete(f"{prefix}/mounts/media_test")
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_not_called()
resp = await api_client.delete(f"{prefix}/mounts/backup_test")
result = await resp.json()
assert result["result"] == "ok"
await asyncio.sleep(0)
reload.assert_called_once()
async def test_options(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
mount,
mock_is_mount,
):
"""Test changing options."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "other_backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
},
)
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "media_test",
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media",
},
)
result = await resp.json()
assert result["result"] == "ok"
coresys.mounts.save_data.reset_mock()
# Not a backup mount, will fail
resp = await api_client.post(
f"{prefix}/mounts/options",
json={
"default_backup_mount": "media_test",
},
)
result = await resp.json()
assert result["result"] == "error"
# Mount doesn't exist, will fail
resp = await api_client.post(
f"{prefix}/mounts/options",
json={
"default_backup_mount": "junk",
},
)
result = await resp.json()
assert result["result"] == "error"
assert coresys.mounts.default_backup_mount.name == "backup_test"
coresys.mounts.save_data.assert_not_called()
# Changes to new backup mount
resp = await api_client.post(
f"{prefix}/mounts/options",
json={
"default_backup_mount": "other_backup_test",
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount.name == "other_backup_test"
coresys.mounts.save_data.assert_called_once()
# Unsets default backup mount
resp = await api_client.post(
f"{prefix}/mounts/options",
json={
"default_backup_mount": None,
},
)
result = await resp.json()
assert result["result"] == "ok"
assert coresys.mounts.default_backup_mount is None
assert coresys.mounts.save_data.call_count == 2
async def test_api_create_mount_fails_special_chars(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
):
"""Test creating a mount via API fails with special characters."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "Überwachungskameras",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backups",
"version": "2.0",
},
)
result = await resp.json()
assert result["result"] == "error"
assert "does not match regular expression" in result["message"]
async def test_api_create_read_only_cifs_mount(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating a read-only cifs mount via API."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "media_test",
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media",
"version": "2.0",
"read_only": True,
},
)
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"version": "2.0",
"name": "media_test",
"type": "cifs",
"usage": "media",
"server": "media.local",
"share": "media",
"state": "active",
"read_only": True,
"user_path": "/media/media_test",
}
]
coresys.mounts.save_data.assert_called_once()
async def test_api_create_read_only_nfs_mount(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
mock_is_mount,
):
"""Test creating a read-only nfs mount via API."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "media_test",
"type": "nfs",
"usage": "media",
"server": "media.local",
"path": "/media/camera",
"read_only": True,
},
)
result = await resp.json()
assert result["result"] == "ok"
resp = await api_client.get(f"{prefix}/mounts")
result = await resp.json()
assert result["data"]["mounts"] == [
{
"name": "media_test",
"type": "nfs",
"usage": "media",
"server": "media.local",
"path": "/media/camera",
"state": "active",
"read_only": True,
"user_path": "/media/media_test",
}
]
coresys.mounts.save_data.assert_called_once()
async def test_api_read_only_backup_mount_invalid(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
tmp_supervisor_data,
path_extern,
mount_propagation,
):
"""Test cannot create a read-only backup mount."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(
f"{prefix}/mounts",
json={
"name": "backup_test",
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "backup",
"version": "2.0",
"read_only": True,
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert "Backup mounts cannot be read only" in result["message"]
@pytest.mark.parametrize(
("method", "url"),
[
("put", "/mounts/bad"),
("delete", "/mounts/bad"),
("post", "/mounts/bad/reload"),
],
)
async def test_mount_not_found(
api_client_with_prefix: tuple[TestClient, str], method: str, url: str
):
"""Test mount not found error."""
api_client, prefix = api_client_with_prefix
resp = await api_client.request(method, f"{prefix}{url}")
assert resp.status == 404
resp = await resp.json()
assert resp["message"] == "No mount exists with name bad"