"""Common test functions.""" import asyncio from collections.abc import AsyncGenerator, Generator from datetime import datetime from pathlib import Path import subprocess from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch from uuid import uuid4 from aiodocker.channel import Channel, ChannelSubscriber from aiodocker.containers import DockerContainer, DockerContainers from aiodocker.docker import DockerImages from aiodocker.events import DockerEvents from aiodocker.execs import Exec from aiodocker.networks import DockerNetwork, DockerNetworks from aiodocker.system import DockerSystem from aiodocker.volumes import DockerVolumes from aiohttp import ClientSession, web from aiohttp.test_utils import TestClient from awesomeversion import AwesomeVersion from blockbuster import BlockBuster, BlockBusterFunction from dbus_fast import BusType from dbus_fast.aio.message_bus import MessageBus import pytest from securetar import SecureTarArchive from supervisor import config as su_config from supervisor.addons.addon import Addon from supervisor.addons.validate import SCHEMA_ADDON_SYSTEM from supervisor.api import RestAPI from supervisor.backups.backup import Backup from supervisor.backups.const import BackupType from supervisor.backups.validate import ALL_FOLDERS from supervisor.bootstrap import initialize_coresys from supervisor.const import ( ATTR_ADDONS, ATTR_ADDONS_CUSTOM_LIST, ATTR_DATE, ATTR_EXCLUDE_DATABASE, ATTR_FOLDERS, ATTR_HOMEASSISTANT, ATTR_NAME, ATTR_REPOSITORIES, ATTR_SIZE, ATTR_SLUG, ATTR_TYPE, ATTR_VERSION, REQUEST_FROM, CoreState, CpuArch, ) from supervisor.coresys import CoreSys from supervisor.dbus.network import NetworkManager from supervisor.docker.manager import DockerAPI from supervisor.exceptions import HostLogError from supervisor.homeassistant.api import APIState from supervisor.host.logs import LogsControl from supervisor.os.manager import OSManager from supervisor.store.addon import AddonStore from supervisor.store.repository import Repository from supervisor.utils.dt import utcnow from .common import ( AsyncIterator, MockResponse, load_fixture, load_json_fixture, mock_dbus_services, ) from .const import TEST_ADDON_SLUG from .dbus_service_mocks.base import DBusServiceMock from .dbus_service_mocks.network_active_connection import ( DEFAULT_OBJECT_PATH as DEFAULT_ACTIVE_CONNECTION_OBJECT_PATH, ActiveConnection as ActiveConnectionService, ) from .dbus_service_mocks.network_connection_settings import ( DEFAULT_OBJECT_PATH as DEFAULT_CONNECTION_SETTINGS_OBJECT_PATH, ConnectionSettings as ConnectionSettingsService, ) from .dbus_service_mocks.network_dns_manager import DnsManager as DnsManagerService from .dbus_service_mocks.network_manager import NetworkManager as NetworkManagerService # pylint: disable=redefined-outer-name, protected-access @pytest.fixture(autouse=True) def blockbuster(request: pytest.FixtureRequest) -> BlockBuster | None: """Raise for blocking I/O in event loop.""" if getattr(request, "param", "") == "no_blockbuster": yield None return # Only scanning supervisor code for now as that's our primary interest # This will still raise for tests that call utilities in supervisor code that block # But it will ignore calls to libraries and such that do blocking I/O directly from tests # Removing that would be nice but a todo for the future SCANNED_MODULES = ["supervisor"] blockbuster = BlockBuster(scanned_modules=SCANNED_MODULES) blockbuster.functions["pathlib.Path.open"] = BlockBusterFunction( Path, "open", scanned_modules=SCANNED_MODULES ) blockbuster.functions["pathlib.Path.close"] = BlockBusterFunction( Path, "close", scanned_modules=SCANNED_MODULES ) blockbuster.activate() yield blockbuster blockbuster.deactivate() @pytest.fixture async def path_extern(monkeypatch: pytest.MonkeyPatch) -> None: """Set external path env for tests.""" monkeypatch.setenv("SUPERVISOR_SHARE", "/mnt/data/supervisor") yield @pytest.fixture async def supervisor_name(monkeypatch: pytest.MonkeyPatch) -> None: """Set env for supervisor name.""" monkeypatch.setenv("SUPERVISOR_NAME", "hassio_supervisor") yield @pytest.fixture async def docker() -> DockerAPI: """Mock DockerAPI.""" image_inspect = { "Os": "linux", "Architecture": "amd64", "Id": "test123", "RepoTags": ["ghcr.io/home-assistant/amd64-hassio-supervisor:latest"], } container_inspect = image_inspect | { "State": {"ExitCode": 0, "Status": "stopped", "Running": False}, "Image": "abc123", } network_inspect = { "Name": "hassio", "Id": "hassio123", "EnableIPv4": True, "EnableIPv6": False, "IPAM": { "Driver": "default", "Options": None, "Config": [ { "Subnet": "172.30.32.0/23", "IPRange": "172.30.33.0/24", "Gateway": "172.30.32.1", } ], }, "Containers": {}, } system_info = { "ServerVersion": "1.0.0", "Driver": "overlay2", "LoggingDriver": "journald", "CgroupVersion": "1", } with ( patch( "supervisor.docker.manager.aiodocker.Docker", return_value=( MagicMock( networks=(docker_networks := MagicMock(spec=DockerNetworks)), images=(docker_images := MagicMock(spec=DockerImages)), containers=(docker_containers := MagicMock(spec=DockerContainers)), events=(docker_events := MagicMock(spec=DockerEvents)), system=(docker_system := MagicMock(spec=DockerSystem)), volumes=MagicMock(spec=DockerVolumes), ) ), ), patch( "supervisor.docker.manager.DockerAPI.images", new=PropertyMock(return_value=docker_images), ), patch( "supervisor.docker.manager.DockerAPI.containers", new=PropertyMock(return_value=docker_containers), ), ): # Info mocking docker_system.info.return_value = system_info # Network mocking docker_networks.get.return_value = docker_network = MagicMock( spec=DockerNetwork ) docker_network.show.return_value = network_inspect # Events mocking docker_events.channel = channel = Channel() docker_events.subscribe.return_value = ChannelSubscriber(channel) docker_events.stop = lambda *_: channel.publish(None) # Images mocking docker_images.inspect.return_value = image_inspect docker_images.list.return_value = [image_inspect] docker_images.import_image = AsyncMock( return_value=[{"stream": "Loaded image: test:latest\n"}] ) docker_images.pull.return_value = AsyncIterator([{}]) # Export image mocking class MockCM: def __init__(self): self.content = [b""] async def __aenter__(self): out = MagicMock() out.iter_chunked.return_value = AsyncIterator(self.content) return out async def __aexit__(self, exc_type, exc, tb): return None docker_images.export_image.return_value = MockCM() # Containers mocking docker_containers.get.return_value = docker_container = MagicMock( spec=DockerContainer, id=container_inspect["Id"] ) docker_containers.list.return_value = [docker_container] docker_containers.create.return_value = docker_container docker_container.show.return_value = container_inspect docker_container.wait.return_value = {"StatusCode": 0} docker_container.log = AsyncMock(return_value=[]) docker_container.exec.return_value = docker_exec = MagicMock(spec=Exec) # start() with detach=False returns a Stream (not async) # Use return_value instead of side_effect to avoid it being replaced by tests docker_exec.start.return_value = create_mock_exec_stream(output=b"") docker_exec.inspect.return_value = {"ExitCode": 0} # Load Docker manager docker_obj = await DockerAPI( MagicMock(create_task=asyncio.get_running_loop().create_task) ).post_init() docker_obj.config._data = {"registries": {}} await docker_obj.load() # Mock manifest fetcher to return None (falls back to count-based progress) docker_obj._manifest_fetcher.get_manifest = AsyncMock(return_value=None) yield docker_obj # Clean up await docker_obj.unload() @pytest.fixture(scope="session") def dbus_session() -> Generator[str]: """Start a dbus session. Returns session address. """ with subprocess.Popen( [ "dbus-daemon", "--nofork", "--print-address", "--session", ], stdout=subprocess.PIPE, ) as proc: yield proc.stdout.readline().decode("utf-8").strip() proc.terminate() @pytest.fixture async def dbus_session_bus(dbus_session) -> AsyncGenerator[MessageBus, None, None]: """Return message bus connected to session dbus.""" bus = await MessageBus(bus_type=BusType.SESSION, bus_address=dbus_session).connect() yield bus bus.disconnect() @pytest.fixture async def dbus_is_connected(): """Mock DBusInterface.is_connected for tests.""" with patch( "supervisor.dbus.interface.DBusInterface.is_connected", return_value=True, ) as is_connected: yield is_connected @pytest.fixture(name="network_manager_services") async def fixture_network_manager_services( dbus_session_bus: MessageBus, ) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]: """Mock all services network manager connects to.""" yield await mock_dbus_services( { "network_access_point": [ "/org/freedesktop/NetworkManager/AccessPoint/43099", "/org/freedesktop/NetworkManager/AccessPoint/43100", ], "network_active_connection": [ "/org/freedesktop/NetworkManager/ActiveConnection/1", "/org/freedesktop/NetworkManager/ActiveConnection/38", ], "network_connection_settings": [ "/org/freedesktop/NetworkManager/Settings/1", "/org/freedesktop/NetworkManager/Settings/38", ], "network_device_wireless": None, "network_device": [ "/org/freedesktop/NetworkManager/Devices/1", "/org/freedesktop/NetworkManager/Devices/3", "/org/freedesktop/NetworkManager/Devices/38", ], "network_device_vlan": None, "network_dns_manager": None, "network_ip4config": None, "network_ip6config": None, "network_manager": None, "network_settings": None, }, dbus_session_bus, ) @pytest.fixture async def network_manager( network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], dbus_session_bus: MessageBus, ) -> NetworkManager: """Mock Network Manager.""" nm_obj = NetworkManager() await nm_obj.connect(dbus_session_bus) yield nm_obj @pytest.fixture async def network_manager_service( network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], ) -> NetworkManagerService: """Return Network Manager service mock.""" yield network_manager_services["network_manager"] @pytest.fixture async def dns_manager_service( network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], ) -> AsyncGenerator[DnsManagerService]: """Return DNS Manager service mock.""" yield network_manager_services["network_dns_manager"] @pytest.fixture(name="active_connection_service") async def fixture_active_connection_service( network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], ) -> ActiveConnectionService: """Return mock active connection service.""" yield network_manager_services["network_active_connection"][ DEFAULT_ACTIVE_CONNECTION_OBJECT_PATH ] @pytest.fixture(name="connection_settings_service") async def fixture_connection_settings_service( network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], ) -> ConnectionSettingsService: """Return mock connection settings service.""" yield network_manager_services["network_connection_settings"][ DEFAULT_CONNECTION_SETTINGS_OBJECT_PATH ] @pytest.fixture(name="udisks2_services") async def fixture_udisks2_services( dbus_session_bus: MessageBus, ) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]: """Mock all services UDisks2 connects to.""" yield await mock_dbus_services( { "udisks2_block": [ "/org/freedesktop/UDisks2/block_devices/loop0", "/org/freedesktop/UDisks2/block_devices/mmcblk1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p2", "/org/freedesktop/UDisks2/block_devices/mmcblk1p3", "/org/freedesktop/UDisks2/block_devices/sda", "/org/freedesktop/UDisks2/block_devices/sda1", "/org/freedesktop/UDisks2/block_devices/sdb", "/org/freedesktop/UDisks2/block_devices/sdb1", "/org/freedesktop/UDisks2/block_devices/zram1", ], "udisks2_drive": [ "/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291", "/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6", "/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56", ], "udisks2_filesystem": [ "/org/freedesktop/UDisks2/block_devices/mmcblk1p1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p3", "/org/freedesktop/UDisks2/block_devices/sda1", "/org/freedesktop/UDisks2/block_devices/sdb1", "/org/freedesktop/UDisks2/block_devices/zram1", ], "udisks2_loop": None, "udisks2_manager": None, "udisks2": None, "udisks2_partition_table": [ "/org/freedesktop/UDisks2/block_devices/mmcblk1", "/org/freedesktop/UDisks2/block_devices/sda", "/org/freedesktop/UDisks2/block_devices/sdb", ], "udisks2_partition": [ "/org/freedesktop/UDisks2/block_devices/mmcblk1p1", "/org/freedesktop/UDisks2/block_devices/mmcblk1p2", "/org/freedesktop/UDisks2/block_devices/mmcblk1p3", "/org/freedesktop/UDisks2/block_devices/sda1", "/org/freedesktop/UDisks2/block_devices/sdb1", ], }, dbus_session_bus, ) @pytest.fixture(name="os_agent_services") async def fixture_os_agent_services( dbus_session_bus: MessageBus, ) -> dict[str, DBusServiceMock]: """Mock all services os agent connects to.""" yield await mock_dbus_services( { "os_agent": None, "agent_apparmor": None, "agent_cgroup": None, "agent_datadisk": None, "agent_swap": None, "agent_system": None, "agent_boards": None, "agent_boards_yellow": None, }, dbus_session_bus, ) @pytest.fixture(name="all_dbus_services") async def fixture_all_dbus_services( dbus_session_bus: MessageBus, network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], os_agent_services: dict[str, DBusServiceMock], ) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]: """Mock all dbus services supervisor uses.""" yield ( ( await mock_dbus_services( { "hostname": None, "logind": None, "rauc": None, "resolved": None, "systemd": None, "systemd_unit": None, "timedate": None, }, dbus_session_bus, ) ) | network_manager_services | udisks2_services | os_agent_services ) @pytest.fixture(autouse=True) def _mock_firewall(): """Mock out firewall rules by default to avoid dbus signal timeouts.""" patcher = patch( "supervisor.host.firewall.FirewallManager.apply_gateway_firewall_rules", new_callable=AsyncMock, ) patcher.start() yield patcher patcher.stop() @pytest.fixture async def coresys( docker: DockerAPI, dbus_session_bus: MessageBus, all_dbus_services, aiohttp_client, run_supervisor_state, supervisor_name, request: pytest.FixtureRequest, ) -> CoreSys: """Create a CoreSys Mock.""" with ( patch("supervisor.bootstrap.initialize_system"), patch("supervisor.utils.sentry.sentry_sdk.init"), patch("supervisor.core.Core._write_run_state"), ): coresys_obj = await initialize_coresys() # Mock save json coresys_obj._ingress.save_data = AsyncMock() coresys_obj._auth.save_data = AsyncMock() coresys_obj._updater.save_data = AsyncMock() coresys_obj._config.save_data = AsyncMock() coresys_obj._jobs.save_data = AsyncMock() coresys_obj._resolution.save_data = AsyncMock() coresys_obj._addons.data.save_data = AsyncMock() coresys_obj._store.save_data = AsyncMock() coresys_obj._mounts.save_data = AsyncMock() # Mock test client coresys_obj._supervisor.instance._meta = { "Config": {"Labels": {"io.hass.arch": "amd64"}}, "HostConfig": {"Privileged": True}, } coresys_obj.arch._default_arch = CpuArch.AMD64 coresys_obj.arch._supported_arch = [CpuArch.AMD64] coresys_obj.arch._supported_set = {CpuArch.AMD64} coresys_obj._machine = "qemux86-64" coresys_obj._machine_id = uuid4() # Mock host communication with ( patch("supervisor.dbus.manager.MessageBus") as message_bus, patch("supervisor.dbus.manager.SOCKET_DBUS"), ): message_bus.return_value.connect = AsyncMock(return_value=dbus_session_bus) await coresys_obj._dbus.load() # Mock docker coresys_obj._docker = docker coresys_obj.docker.coresys = coresys_obj docker.monitor.coresys = coresys_obj # Set internet state coresys_obj.supervisor._connectivity = True coresys_obj.host.network._connectivity = True # Fix Paths su_config.ADDONS_CORE = Path( Path(__file__).parent.joinpath("fixtures"), "addons/core" ) su_config.ADDONS_LOCAL = Path( Path(__file__).parent.joinpath("fixtures"), "addons/local" ) su_config.ADDONS_GIT = Path( Path(__file__).parent.joinpath("fixtures"), "addons/git" ) su_config.APPARMOR_DATA = Path( Path(__file__).parent.joinpath("fixtures"), "apparmor" ) # WebSocket coresys_obj.homeassistant.api.get_api_state = AsyncMock( return_value=APIState("RUNNING", False) ) coresys_obj.homeassistant._websocket._client = AsyncMock( ha_version=AwesomeVersion("2021.2.4") ) if not request.node.get_closest_marker("no_mock_init_websession"): coresys_obj.init_websession = AsyncMock() # Don't remove files/folders related to addons and stores with patch("supervisor.store.git.GitRepo.remove"): yield coresys_obj await coresys_obj.dbus.unload() @pytest.fixture async def ha_ws_client(coresys: CoreSys) -> AsyncMock: """Return HA WS client mock for assertions.""" # Set Supervisor Core state to RUNNING, otherwise WS events won't be delivered await coresys.core.set_state(CoreState.RUNNING) await asyncio.sleep(0) client = coresys.homeassistant.websocket._client client.async_send_command.reset_mock() return client @pytest.fixture async def tmp_supervisor_data(coresys: CoreSys, tmp_path: Path) -> Path: """Patch supervisor data to be tmp_path.""" with patch.object( su_config.CoreConfig, "path_supervisor", new=PropertyMock(return_value=tmp_path) ): coresys.config.path_emergency.mkdir() coresys.config.path_media.mkdir() coresys.config.path_mounts.mkdir() coresys.config.path_mounts_credentials.mkdir() coresys.config.path_backup.mkdir() coresys.config.path_tmp.mkdir() coresys.config.path_homeassistant.mkdir() coresys.config.path_audio.mkdir() coresys.config.path_dns.mkdir() coresys.config.path_share.mkdir() coresys.config.path_addons_data.mkdir(parents=True) coresys.config.path_addon_configs.mkdir(parents=True) coresys.config.path_ssl.mkdir() coresys.config.path_core_backup.mkdir(parents=True) coresys.config.path_cid_files.mkdir() yield tmp_path @pytest.fixture async def journald_gateway() -> AsyncGenerator[MagicMock]: """Mock logs control.""" with ( patch("supervisor.host.logs.Path.is_socket", return_value=True), patch("supervisor.host.logs.ClientSession.get") as get, ): reader = asyncio.StreamReader(loop=asyncio.get_running_loop()) client_response = MagicMock(content=reader, get=get) async def response_text(): return (await client_response.content.read()).decode("utf-8") client_response.text = response_text client_response.status = 200 get.return_value.__aenter__.return_value = client_response get.return_value.__aenter__.return_value.__aenter__.return_value = ( client_response ) yield client_response @pytest.fixture async def without_journal_gatewayd_boots() -> AsyncGenerator[MagicMock]: """Make method using /boots of systemd-journald-gateway fail.""" def raise_host_log_error_side_effect(*args, **kwargs): raise HostLogError("Mocked error") with patch( "supervisor.host.logs.LogsControl._get_boot_ids_native" ) as get_boot_ids_native: get_boot_ids_native.side_effect = raise_host_log_error_side_effect yield get_boot_ids_native @pytest.fixture async def journal_logs_reader() -> MagicMock: """Mock journal_logs_reader in host API.""" with patch("supervisor.api.host.journal_logs_reader") as reader: yield reader @pytest.fixture def sys_machine(): """Mock sys_machine.""" with patch("supervisor.coresys.CoreSys.machine", new_callable=PropertyMock) as mock: yield mock @pytest.fixture def sys_supervisor(): """Mock sys_supervisor.""" with patch( "supervisor.coresys.CoreSys.supervisor", new_callable=PropertyMock ) as mock: mock.return_value = MagicMock() yield MagicMock @pytest.fixture async def api_client( aiohttp_client, coresys: CoreSys, request: pytest.FixtureRequest, ) -> TestClient: """Fixture for RestAPI client.""" request_from: str | None = getattr(request, "param", None) @web.middleware async def _security_middleware(request: web.Request, handler: web.RequestHandler): """Make request are from Core or specified add-on.""" if request_from: request[REQUEST_FROM] = coresys.addons.get(request_from, local_only=True) else: request[REQUEST_FROM] = coresys.homeassistant return await handler(request) api = RestAPI(coresys) api.webapp = web.Application(middlewares=[_security_middleware]) api.start = AsyncMock() await api.load() yield await aiohttp_client(api.webapp) @pytest.fixture def supervisor_internet(coresys: CoreSys) -> Generator[AsyncMock]: """Fixture which simluate Supervsior internet connection.""" connectivity_check = AsyncMock(return_value=True) coresys.supervisor.check_connectivity = connectivity_check yield connectivity_check @pytest.fixture def websession(coresys: CoreSys) -> Generator[MagicMock]: """Fixture for global aiohttp SessionClient.""" coresys._websession = MagicMock(spec_set=ClientSession) yield coresys._websession @pytest.fixture def mock_update_data(websession: MagicMock) -> Generator[MockResponse]: """Mock updater JSON data.""" version_data = load_fixture("version_stable.json") client_response = MockResponse(text=version_data) client_response.status = 200 websession.get = MagicMock(return_value=client_response) yield client_response @pytest.fixture def store_manager(coresys: CoreSys): """Fixture for the store manager.""" sm_obj = coresys.store with patch("supervisor.store.data.StoreData.update", return_value=MagicMock()): yield sm_obj @pytest.fixture def run_supervisor_state(request: pytest.FixtureRequest) -> Generator[MagicMock]: """Fixture to simulate Supervisor state file in /run/supervisor.""" if getattr(request, "param", "test_file"): with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run: yield mock_run else: with patch("supervisor.core.Core._write_run_state") as mock_write_state: yield mock_write_state @pytest.fixture def store_addon(coresys: CoreSys, tmp_path, test_repository): """Store add-on fixture.""" addon_obj = AddonStore(coresys, "test_store_addon") coresys.addons.store[addon_obj.slug] = addon_obj coresys.store.data.addons[addon_obj.slug] = SCHEMA_ADDON_SYSTEM( load_json_fixture("add-on.json") ) coresys.store.data.addons[addon_obj.slug]["location"] = tmp_path yield addon_obj @pytest.fixture async def test_repository(coresys: CoreSys): """Test add-on store repository fixture.""" coresys.config._data[ATTR_ADDONS_CUSTOM_LIST] = [] with ( patch("supervisor.store.git.GitRepo.load", return_value=None), ): await coresys.store.load() repository_obj = Repository.create( coresys, "https://github.com/awesome-developer/awesome-repo" ) coresys.store.repositories[repository_obj.slug] = repository_obj coresys.store._data[ATTR_REPOSITORIES].append( "https://github.com/awesome-developer/awesome-repo" ) yield repository_obj @pytest.fixture async def install_addon_ssh(coresys: CoreSys, test_repository): """Install local_ssh add-on.""" store = coresys.addons.store[TEST_ADDON_SLUG] await coresys.addons.data.install(store) coresys.addons.data._data = coresys.addons.data._schema(coresys.addons.data._data) addon = Addon(coresys, store.slug) coresys.addons.local[addon.slug] = addon yield addon @pytest.fixture async def install_addon_example(coresys: CoreSys, test_repository): """Install local_example add-on.""" store = coresys.addons.store["local_example"] await coresys.addons.data.install(store) coresys.addons.data._data = coresys.addons.data._schema(coresys.addons.data._data) addon = Addon(coresys, store.slug) coresys.addons.local[addon.slug] = addon yield addon @pytest.fixture async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup: """Mock a full backup.""" mock_backup = Backup( coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240 ) mock_backup.new("Test", utcnow().isoformat(), BackupType.FULL) mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"] mock_backup.docker = {} mock_backup._data[ATTR_ADDONS] = [ { ATTR_SLUG: "local_ssh", ATTR_NAME: "SSH", ATTR_VERSION: "1.0.0", ATTR_SIZE: 0, } ] mock_backup._data[ATTR_FOLDERS] = ALL_FOLDERS mock_backup._data[ATTR_HOMEASSISTANT] = { ATTR_VERSION: AwesomeVersion("2022.8.0"), ATTR_SIZE: 0, ATTR_EXCLUDE_DATABASE: False, } coresys.backups._backups = {"test": mock_backup} yield mock_backup @pytest.fixture async def mock_partial_backup(coresys: CoreSys, tmp_path) -> Backup: """Mock a partial backup.""" mock_backup = Backup( coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240 ) mock_backup.new("Test", utcnow().isoformat(), BackupType.PARTIAL) mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"] mock_backup.docker = {} mock_backup._data[ATTR_ADDONS] = [ { ATTR_SLUG: "local_ssh", ATTR_NAME: "SSH", ATTR_VERSION: "1.0.0", ATTR_SIZE: 0, } ] mock_backup._data[ATTR_FOLDERS] = ALL_FOLDERS mock_backup._data[ATTR_HOMEASSISTANT] = { ATTR_VERSION: AwesomeVersion("2022.8.0"), ATTR_SIZE: 0, ATTR_EXCLUDE_DATABASE: False, } coresys.backups._backups = {"test": mock_backup} yield mock_backup @pytest.fixture async def backups( coresys: CoreSys, tmp_path, request: pytest.FixtureRequest ) -> list[Backup]: """Create and return mock backups.""" for i in range(request.param if hasattr(request, "param") else 5): slug = f"sn{i + 1}" temp_tar = Path(tmp_path, f"{slug}.tar") with SecureTarArchive(temp_tar, "w"): pass backup = Backup(coresys, temp_tar, slug, None) backup._data = { # pylint: disable=protected-access ATTR_SLUG: slug, ATTR_DATE: utcnow().isoformat(), ATTR_TYPE: BackupType.PARTIAL if slug[-1] == "1" or slug[-1] == "5" else BackupType.FULL, } coresys.backups._backups[backup.slug] = backup yield coresys.backups.list_backups @pytest.fixture async def journald_logs(coresys: CoreSys) -> MagicMock: """Mock journald logs and make it available.""" with ( patch.object(LogsControl, "available", new=PropertyMock(return_value=True)), patch.object(LogsControl, "get_boot_ids", return_value=["aaa", "bbb", "ccc"]), patch.object( LogsControl, "get_identifiers", return_value=["hassio_supervisor", "hassos-config", "kernel"], ), patch.object(LogsControl, "journald_logs", new=MagicMock()) as logs, ): await coresys.host.logs.load() yield logs @pytest.fixture async def docker_logs(container: DockerContainer, supervisor_name) -> AsyncMock: """Mock log output for a container from docker.""" logs = load_fixture("logs_docker_container.txt") container.log.return_value = logs.splitlines() yield container.log @pytest.fixture async def capture_exception() -> Mock: """Mock capture exception method for testing.""" with ( patch("supervisor.utils.sentry.sentry_sdk.is_initialized", return_value=True), patch( "supervisor.utils.sentry.sentry_sdk.capture_exception" ) as capture_exception, ): yield capture_exception @pytest.fixture async def os_available(request: pytest.FixtureRequest) -> None: """Mock os as available.""" version = ( AwesomeVersion(request.param) if hasattr(request, "param") else AwesomeVersion("16.2") ) with ( patch.object(OSManager, "available", new=PropertyMock(return_value=True)), patch.object(OSManager, "version", new=PropertyMock(return_value=version)), ): yield def create_mock_exec_stream(output: bytes = b"") -> AsyncMock: """Create a mock stream for exec with detach=False.""" stream = AsyncMock() # Set up async context manager stream.__aenter__.return_value = stream stream.__aexit__.return_value = None # Set up read_out to return messages then None (EOF) if output: Message = type("Message", (), {"data": output}) stream.read_out.side_effect = [Message(), None] else: stream.read_out.return_value = None return stream @pytest.fixture async def container(docker: DockerAPI) -> DockerContainer: """Mock attrs and status for container on attach.""" container_mock = docker.containers.get.return_value # Set up exec mock to return a mock stream # Note: This must be a regular function, not async, to match aiodocker's start() behavior def mock_exec_start(detach=False): if detach: # Old behavior for detach=True (shouldn't be used anymore) # Would need to return an awaitable async def _async_start(): return b"" return _async_start() # Return mock stream for detach=False (synchronous) return create_mock_exec_stream(output=b"") # Replace the mock's start method with a MagicMock (not AsyncMock) start_mock = MagicMock(side_effect=mock_exec_start) container_mock.exec.return_value.start = start_mock # Store the original side_effect so tests can override it but we can restore if needed start_mock._original_side_effect = mock_exec_start container_mock.exec.return_value.inspect.return_value = { "Running": False, "ExitCode": 0, "Pid": 12345, } yield container_mock @pytest.fixture async def mount_propagation(container: DockerContainer, coresys: CoreSys) -> None: """Mock supervisor connected to container with propagation set.""" container.show.return_value["Mounts"] = [ { "Type": "bind", "Source": "/mnt/data/supervisor", "Destination": "/data", "Mode": "rw", "RW": True, "Propagation": "slave", } ] await coresys.supervisor.load() yield @pytest.fixture def mock_amd64_arch_supported(coresys: CoreSys) -> None: """Mock amd64 arch as supported.""" coresys.arch._supported_arch = [CpuArch.AMD64] coresys.arch._supported_set = {CpuArch.AMD64} @pytest.fixture def mock_aarch64_arch_supported(coresys: CoreSys) -> None: """Mock aarch64 arch as supported.""" coresys.arch._supported_arch = [CpuArch.AMD64] coresys.arch._supported_set = {CpuArch.AMD64} @pytest.fixture def mock_is_mount() -> MagicMock: """Mock is_mount in mounts.""" with patch("supervisor.mounts.mount.Path.is_mount", return_value=True) as is_mount: yield is_mount @pytest.fixture def no_job_throttle(): """Remove job throttle for tests.""" with patch("supervisor.jobs.decorator.Job.last_call", return_value=datetime.min): yield