mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-17 15:23:21 +01:00
Switch internal Supervisor-to-Core HTTP and WebSocket communication from TCP (port 8123) to a Unix domain socket. The existing /run/supervisor directory on the host (already mounted at /run/os inside the Supervisor container) is bind-mounted into the Core container at /run/supervisor. Core receives the socket path via the SUPERVISOR_CORE_API_SOCKET environment variable, creates the socket there, and Supervisor connects to it via aiohttp.UnixConnector at /run/os/core.sock. Since the Unix socket is only reachable by processes on the same host, requests arriving over it are implicitly trusted and authenticated as the existing Supervisor system user. This removes the token round-trip where Supervisor had to obtain and send Bearer tokens on every Core API call. WebSocket connections are likewise authenticated implicitly, skipping the auth_required/auth handshake. Key design decisions: - Version-gated by CORE_UNIX_SOCKET_MIN_VERSION so older Core versions transparently continue using TCP with token auth - LANDINGPAGE is explicitly excluded (not a CalVer version) - Hard-fails with a clear error if the socket file is unexpectedly missing when Unix socket communication is expected - WSClient.connect() for Unix socket (no auth) and WSClient.connect_with_auth() for TCP (token auth) separate the two connection modes cleanly - Token refresh always uses the TCP websession since it is inherently a TCP/Bearer-auth operation - Logs which transport (Unix socket vs TCP) is being used on first request Closes #6626 Related Core PR: home-assistant/core#163907 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1024 lines
34 KiB
Python
1024 lines
34 KiB
Python
"""Common test functions."""
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncGenerator, Generator
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import subprocess
|
|
from unittest.mock import AsyncMock, MagicMock, Mock, PropertyMock, patch
|
|
from uuid import uuid4
|
|
|
|
from aiodocker.channel import Channel, ChannelSubscriber
|
|
from aiodocker.containers import DockerContainer, DockerContainers
|
|
from aiodocker.docker import DockerImages
|
|
from aiodocker.events import DockerEvents
|
|
from aiodocker.execs import Exec
|
|
from aiodocker.networks import DockerNetwork, DockerNetworks
|
|
from aiodocker.system import DockerSystem
|
|
from aiodocker.volumes import DockerVolumes
|
|
from aiohttp import ClientSession, web
|
|
from aiohttp.test_utils import TestClient
|
|
from awesomeversion import AwesomeVersion
|
|
from blockbuster import BlockBuster, BlockBusterFunction
|
|
from dbus_fast import BusType
|
|
from dbus_fast.aio.message_bus import MessageBus
|
|
import pytest
|
|
from securetar import SecureTarArchive
|
|
|
|
from supervisor import config as su_config
|
|
from supervisor.addons.addon import Addon
|
|
from supervisor.addons.validate import SCHEMA_ADDON_SYSTEM
|
|
from supervisor.api import RestAPI
|
|
from supervisor.backups.backup import Backup
|
|
from supervisor.backups.const import BackupType
|
|
from supervisor.backups.validate import ALL_FOLDERS
|
|
from supervisor.bootstrap import initialize_coresys
|
|
from supervisor.const import (
|
|
ATTR_ADDONS,
|
|
ATTR_ADDONS_CUSTOM_LIST,
|
|
ATTR_DATE,
|
|
ATTR_EXCLUDE_DATABASE,
|
|
ATTR_FOLDERS,
|
|
ATTR_HOMEASSISTANT,
|
|
ATTR_NAME,
|
|
ATTR_REPOSITORIES,
|
|
ATTR_SIZE,
|
|
ATTR_SLUG,
|
|
ATTR_TYPE,
|
|
ATTR_VERSION,
|
|
REQUEST_FROM,
|
|
CoreState,
|
|
CpuArch,
|
|
)
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.dbus.network import NetworkManager
|
|
from supervisor.docker.manager import DockerAPI
|
|
from supervisor.exceptions import HostLogError
|
|
from supervisor.homeassistant.api import APIState
|
|
from supervisor.host.logs import LogsControl
|
|
from supervisor.os.manager import OSManager
|
|
from supervisor.store.addon import AddonStore
|
|
from supervisor.store.repository import Repository
|
|
from supervisor.utils.dt import utcnow
|
|
|
|
from .common import (
|
|
AsyncIterator,
|
|
MockResponse,
|
|
load_fixture,
|
|
load_json_fixture,
|
|
mock_dbus_services,
|
|
)
|
|
from .const import TEST_ADDON_SLUG
|
|
from .dbus_service_mocks.base import DBusServiceMock
|
|
from .dbus_service_mocks.network_active_connection import (
|
|
DEFAULT_OBJECT_PATH as DEFAULT_ACTIVE_CONNECTION_OBJECT_PATH,
|
|
ActiveConnection as ActiveConnectionService,
|
|
)
|
|
from .dbus_service_mocks.network_connection_settings import (
|
|
DEFAULT_OBJECT_PATH as DEFAULT_CONNECTION_SETTINGS_OBJECT_PATH,
|
|
ConnectionSettings as ConnectionSettingsService,
|
|
)
|
|
from .dbus_service_mocks.network_dns_manager import DnsManager as DnsManagerService
|
|
from .dbus_service_mocks.network_manager import NetworkManager as NetworkManagerService
|
|
|
|
# pylint: disable=redefined-outer-name, protected-access
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def blockbuster(request: pytest.FixtureRequest) -> BlockBuster | None:
|
|
"""Raise for blocking I/O in event loop."""
|
|
if getattr(request, "param", "") == "no_blockbuster":
|
|
yield None
|
|
return
|
|
|
|
# Only scanning supervisor code for now as that's our primary interest
|
|
# This will still raise for tests that call utilities in supervisor code that block
|
|
# But it will ignore calls to libraries and such that do blocking I/O directly from tests
|
|
# Removing that would be nice but a todo for the future
|
|
|
|
SCANNED_MODULES = ["supervisor"]
|
|
blockbuster = BlockBuster(scanned_modules=SCANNED_MODULES)
|
|
blockbuster.functions["pathlib.Path.open"] = BlockBusterFunction(
|
|
Path, "open", scanned_modules=SCANNED_MODULES
|
|
)
|
|
blockbuster.functions["pathlib.Path.close"] = BlockBusterFunction(
|
|
Path, "close", scanned_modules=SCANNED_MODULES
|
|
)
|
|
blockbuster.activate()
|
|
yield blockbuster
|
|
blockbuster.deactivate()
|
|
|
|
|
|
@pytest.fixture
|
|
async def path_extern(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Set external path env for tests."""
|
|
monkeypatch.setenv("SUPERVISOR_SHARE", "/mnt/data/supervisor")
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
async def supervisor_name(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Set env for supervisor name."""
|
|
monkeypatch.setenv("SUPERVISOR_NAME", "hassio_supervisor")
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
async def docker() -> DockerAPI:
|
|
"""Mock DockerAPI."""
|
|
image_inspect = {
|
|
"Os": "linux",
|
|
"Architecture": "amd64",
|
|
"Id": "test123",
|
|
"RepoTags": ["ghcr.io/home-assistant/amd64-hassio-supervisor:latest"],
|
|
}
|
|
container_inspect = image_inspect | {
|
|
"State": {"ExitCode": 0, "Status": "stopped", "Running": False},
|
|
"Image": "abc123",
|
|
}
|
|
network_inspect = {
|
|
"Name": "hassio",
|
|
"Id": "hassio123",
|
|
"EnableIPv4": True,
|
|
"EnableIPv6": False,
|
|
"IPAM": {
|
|
"Driver": "default",
|
|
"Options": None,
|
|
"Config": [
|
|
{
|
|
"Subnet": "172.30.32.0/23",
|
|
"IPRange": "172.30.33.0/24",
|
|
"Gateway": "172.30.32.1",
|
|
}
|
|
],
|
|
},
|
|
"Containers": {},
|
|
}
|
|
system_info = {
|
|
"ServerVersion": "1.0.0",
|
|
"Driver": "overlay2",
|
|
"LoggingDriver": "journald",
|
|
"CgroupVersion": "1",
|
|
}
|
|
|
|
with (
|
|
patch(
|
|
"supervisor.docker.manager.aiodocker.Docker",
|
|
return_value=(
|
|
MagicMock(
|
|
networks=(docker_networks := MagicMock(spec=DockerNetworks)),
|
|
images=(docker_images := MagicMock(spec=DockerImages)),
|
|
containers=(docker_containers := MagicMock(spec=DockerContainers)),
|
|
events=(docker_events := MagicMock(spec=DockerEvents)),
|
|
system=(docker_system := MagicMock(spec=DockerSystem)),
|
|
volumes=MagicMock(spec=DockerVolumes),
|
|
)
|
|
),
|
|
),
|
|
patch(
|
|
"supervisor.docker.manager.DockerAPI.images",
|
|
new=PropertyMock(return_value=docker_images),
|
|
),
|
|
patch(
|
|
"supervisor.docker.manager.DockerAPI.containers",
|
|
new=PropertyMock(return_value=docker_containers),
|
|
),
|
|
):
|
|
# Info mocking
|
|
docker_system.info.return_value = system_info
|
|
|
|
# Network mocking
|
|
docker_networks.get.return_value = docker_network = MagicMock(
|
|
spec=DockerNetwork
|
|
)
|
|
docker_network.show.return_value = network_inspect
|
|
|
|
# Events mocking
|
|
docker_events.channel = channel = Channel()
|
|
docker_events.subscribe.return_value = ChannelSubscriber(channel)
|
|
docker_events.stop = lambda *_: channel.publish(None)
|
|
|
|
# Images mocking
|
|
docker_images.inspect.return_value = image_inspect
|
|
docker_images.list.return_value = [image_inspect]
|
|
docker_images.import_image = AsyncMock(
|
|
return_value=[{"stream": "Loaded image: test:latest\n"}]
|
|
)
|
|
docker_images.pull.return_value = AsyncIterator([{}])
|
|
|
|
# Export image mocking
|
|
class MockCM:
|
|
def __init__(self):
|
|
self.content = [b""]
|
|
|
|
async def __aenter__(self):
|
|
out = MagicMock()
|
|
out.iter_chunked.return_value = AsyncIterator(self.content)
|
|
return out
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
docker_images.export_image.return_value = MockCM()
|
|
|
|
# Containers mocking
|
|
docker_containers.get.return_value = docker_container = MagicMock(
|
|
spec=DockerContainer, id=container_inspect["Id"]
|
|
)
|
|
docker_containers.list.return_value = [docker_container]
|
|
docker_containers.create.return_value = docker_container
|
|
docker_container.show.return_value = container_inspect
|
|
docker_container.wait.return_value = {"StatusCode": 0}
|
|
docker_container.log = AsyncMock(return_value=[])
|
|
|
|
docker_container.exec.return_value = docker_exec = MagicMock(spec=Exec)
|
|
# start() with detach=False returns a Stream (not async)
|
|
# Use return_value instead of side_effect to avoid it being replaced by tests
|
|
docker_exec.start.return_value = create_mock_exec_stream(output=b"")
|
|
docker_exec.inspect.return_value = {"ExitCode": 0}
|
|
|
|
# Load Docker manager
|
|
docker_obj = await DockerAPI(
|
|
MagicMock(create_task=asyncio.get_running_loop().create_task)
|
|
).post_init()
|
|
docker_obj.config._data = {"registries": {}}
|
|
await docker_obj.load()
|
|
|
|
# Mock manifest fetcher to return None (falls back to count-based progress)
|
|
docker_obj._manifest_fetcher.get_manifest = AsyncMock(return_value=None)
|
|
|
|
yield docker_obj
|
|
|
|
# Clean up
|
|
await docker_obj.unload()
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def dbus_session() -> Generator[str]:
|
|
"""Start a dbus session.
|
|
|
|
Returns session address.
|
|
"""
|
|
with subprocess.Popen(
|
|
[
|
|
"dbus-daemon",
|
|
"--nofork",
|
|
"--print-address",
|
|
"--session",
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
) as proc:
|
|
yield proc.stdout.readline().decode("utf-8").strip()
|
|
proc.terminate()
|
|
|
|
|
|
@pytest.fixture
|
|
async def dbus_session_bus(dbus_session) -> AsyncGenerator[MessageBus, None, None]:
|
|
"""Return message bus connected to session dbus."""
|
|
bus = await MessageBus(bus_type=BusType.SESSION, bus_address=dbus_session).connect()
|
|
yield bus
|
|
bus.disconnect()
|
|
|
|
|
|
@pytest.fixture
|
|
async def dbus_is_connected():
|
|
"""Mock DBusInterface.is_connected for tests."""
|
|
with patch(
|
|
"supervisor.dbus.interface.DBusInterface.is_connected",
|
|
return_value=True,
|
|
) as is_connected:
|
|
yield is_connected
|
|
|
|
|
|
@pytest.fixture(name="network_manager_services")
|
|
async def fixture_network_manager_services(
|
|
dbus_session_bus: MessageBus,
|
|
) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]:
|
|
"""Mock all services network manager connects to."""
|
|
yield await mock_dbus_services(
|
|
{
|
|
"network_access_point": [
|
|
"/org/freedesktop/NetworkManager/AccessPoint/43099",
|
|
"/org/freedesktop/NetworkManager/AccessPoint/43100",
|
|
],
|
|
"network_active_connection": [
|
|
"/org/freedesktop/NetworkManager/ActiveConnection/1",
|
|
"/org/freedesktop/NetworkManager/ActiveConnection/38",
|
|
],
|
|
"network_connection_settings": [
|
|
"/org/freedesktop/NetworkManager/Settings/1",
|
|
"/org/freedesktop/NetworkManager/Settings/38",
|
|
],
|
|
"network_device_wireless": None,
|
|
"network_device": [
|
|
"/org/freedesktop/NetworkManager/Devices/1",
|
|
"/org/freedesktop/NetworkManager/Devices/3",
|
|
"/org/freedesktop/NetworkManager/Devices/38",
|
|
],
|
|
"network_device_vlan": None,
|
|
"network_dns_manager": None,
|
|
"network_ip4config": None,
|
|
"network_ip6config": None,
|
|
"network_manager": None,
|
|
"network_settings": None,
|
|
},
|
|
dbus_session_bus,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def network_manager(
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
dbus_session_bus: MessageBus,
|
|
) -> NetworkManager:
|
|
"""Mock Network Manager."""
|
|
nm_obj = NetworkManager()
|
|
await nm_obj.connect(dbus_session_bus)
|
|
yield nm_obj
|
|
|
|
|
|
@pytest.fixture
|
|
async def network_manager_service(
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> NetworkManagerService:
|
|
"""Return Network Manager service mock."""
|
|
yield network_manager_services["network_manager"]
|
|
|
|
|
|
@pytest.fixture
|
|
async def dns_manager_service(
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> AsyncGenerator[DnsManagerService]:
|
|
"""Return DNS Manager service mock."""
|
|
yield network_manager_services["network_dns_manager"]
|
|
|
|
|
|
@pytest.fixture(name="active_connection_service")
|
|
async def fixture_active_connection_service(
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> ActiveConnectionService:
|
|
"""Return mock active connection service."""
|
|
yield network_manager_services["network_active_connection"][
|
|
DEFAULT_ACTIVE_CONNECTION_OBJECT_PATH
|
|
]
|
|
|
|
|
|
@pytest.fixture(name="connection_settings_service")
|
|
async def fixture_connection_settings_service(
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> ConnectionSettingsService:
|
|
"""Return mock connection settings service."""
|
|
yield network_manager_services["network_connection_settings"][
|
|
DEFAULT_CONNECTION_SETTINGS_OBJECT_PATH
|
|
]
|
|
|
|
|
|
@pytest.fixture(name="udisks2_services")
|
|
async def fixture_udisks2_services(
|
|
dbus_session_bus: MessageBus,
|
|
) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]:
|
|
"""Mock all services UDisks2 connects to."""
|
|
yield await mock_dbus_services(
|
|
{
|
|
"udisks2_block": [
|
|
"/org/freedesktop/UDisks2/block_devices/loop0",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
|
|
"/org/freedesktop/UDisks2/block_devices/sda",
|
|
"/org/freedesktop/UDisks2/block_devices/sda1",
|
|
"/org/freedesktop/UDisks2/block_devices/sdb",
|
|
"/org/freedesktop/UDisks2/block_devices/sdb1",
|
|
"/org/freedesktop/UDisks2/block_devices/zram1",
|
|
],
|
|
"udisks2_drive": [
|
|
"/org/freedesktop/UDisks2/drives/BJTD4R_0x97cde291",
|
|
"/org/freedesktop/UDisks2/drives/Generic_Flash_Disk_61BCDDB6",
|
|
"/org/freedesktop/UDisks2/drives/SSK_SSK_Storage_DF56419883D56",
|
|
],
|
|
"udisks2_filesystem": [
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
|
|
"/org/freedesktop/UDisks2/block_devices/sda1",
|
|
"/org/freedesktop/UDisks2/block_devices/sdb1",
|
|
"/org/freedesktop/UDisks2/block_devices/zram1",
|
|
],
|
|
"udisks2_loop": None,
|
|
"udisks2_manager": None,
|
|
"udisks2": None,
|
|
"udisks2_partition_table": [
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1",
|
|
"/org/freedesktop/UDisks2/block_devices/sda",
|
|
"/org/freedesktop/UDisks2/block_devices/sdb",
|
|
],
|
|
"udisks2_partition": [
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p1",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p2",
|
|
"/org/freedesktop/UDisks2/block_devices/mmcblk1p3",
|
|
"/org/freedesktop/UDisks2/block_devices/sda1",
|
|
"/org/freedesktop/UDisks2/block_devices/sdb1",
|
|
],
|
|
},
|
|
dbus_session_bus,
|
|
)
|
|
|
|
|
|
@pytest.fixture(name="os_agent_services")
|
|
async def fixture_os_agent_services(
|
|
dbus_session_bus: MessageBus,
|
|
) -> dict[str, DBusServiceMock]:
|
|
"""Mock all services os agent connects to."""
|
|
yield await mock_dbus_services(
|
|
{
|
|
"os_agent": None,
|
|
"agent_apparmor": None,
|
|
"agent_cgroup": None,
|
|
"agent_datadisk": None,
|
|
"agent_swap": None,
|
|
"agent_system": None,
|
|
"agent_boards": None,
|
|
"agent_boards_yellow": None,
|
|
},
|
|
dbus_session_bus,
|
|
)
|
|
|
|
|
|
@pytest.fixture(name="all_dbus_services")
|
|
async def fixture_all_dbus_services(
|
|
dbus_session_bus: MessageBus,
|
|
network_manager_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
udisks2_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
os_agent_services: dict[str, DBusServiceMock],
|
|
) -> dict[str, DBusServiceMock | dict[str, DBusServiceMock]]:
|
|
"""Mock all dbus services supervisor uses."""
|
|
yield (
|
|
(
|
|
await mock_dbus_services(
|
|
{
|
|
"hostname": None,
|
|
"logind": None,
|
|
"rauc": None,
|
|
"resolved": None,
|
|
"systemd": None,
|
|
"systemd_unit": None,
|
|
"timedate": None,
|
|
},
|
|
dbus_session_bus,
|
|
)
|
|
)
|
|
| network_manager_services
|
|
| udisks2_services
|
|
| os_agent_services
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _mock_firewall():
|
|
"""Mock out firewall rules by default to avoid dbus signal timeouts."""
|
|
patcher = patch(
|
|
"supervisor.host.firewall.FirewallManager.apply_gateway_firewall_rules",
|
|
new_callable=AsyncMock,
|
|
)
|
|
patcher.start()
|
|
yield patcher
|
|
patcher.stop()
|
|
|
|
|
|
@pytest.fixture
|
|
async def coresys(
|
|
docker: DockerAPI,
|
|
dbus_session_bus: MessageBus,
|
|
all_dbus_services,
|
|
aiohttp_client,
|
|
run_supervisor_state,
|
|
supervisor_name,
|
|
request: pytest.FixtureRequest,
|
|
) -> CoreSys:
|
|
"""Create a CoreSys Mock."""
|
|
with (
|
|
patch("supervisor.bootstrap.initialize_system"),
|
|
patch("supervisor.utils.sentry.sentry_sdk.init"),
|
|
patch("supervisor.core.Core._write_run_state"),
|
|
):
|
|
coresys_obj = await initialize_coresys()
|
|
|
|
# Mock save json
|
|
coresys_obj._ingress.save_data = AsyncMock()
|
|
coresys_obj._auth.save_data = AsyncMock()
|
|
coresys_obj._updater.save_data = AsyncMock()
|
|
coresys_obj._config.save_data = AsyncMock()
|
|
coresys_obj._jobs.save_data = AsyncMock()
|
|
coresys_obj._resolution.save_data = AsyncMock()
|
|
coresys_obj._addons.data.save_data = AsyncMock()
|
|
coresys_obj._store.save_data = AsyncMock()
|
|
coresys_obj._mounts.save_data = AsyncMock()
|
|
|
|
# Mock test client
|
|
coresys_obj._supervisor.instance._meta = {
|
|
"Config": {"Labels": {"io.hass.arch": "amd64"}},
|
|
"HostConfig": {"Privileged": True},
|
|
}
|
|
coresys_obj.arch._default_arch = CpuArch.AMD64
|
|
coresys_obj.arch._supported_arch = [CpuArch.AMD64]
|
|
coresys_obj.arch._supported_set = {CpuArch.AMD64}
|
|
coresys_obj._machine = "qemux86-64"
|
|
coresys_obj._machine_id = uuid4()
|
|
|
|
# Mock host communication
|
|
with (
|
|
patch("supervisor.dbus.manager.MessageBus") as message_bus,
|
|
patch("supervisor.dbus.manager.SOCKET_DBUS"),
|
|
):
|
|
message_bus.return_value.connect = AsyncMock(return_value=dbus_session_bus)
|
|
await coresys_obj._dbus.load()
|
|
|
|
# Mock docker
|
|
coresys_obj._docker = docker
|
|
coresys_obj.docker.coresys = coresys_obj
|
|
docker.monitor.coresys = coresys_obj
|
|
|
|
# Set internet state
|
|
coresys_obj.supervisor._connectivity = True
|
|
coresys_obj.host.network._connectivity = True
|
|
|
|
# Fix Paths
|
|
su_config.ADDONS_CORE = Path(
|
|
Path(__file__).parent.joinpath("fixtures"), "addons/core"
|
|
)
|
|
su_config.ADDONS_LOCAL = Path(
|
|
Path(__file__).parent.joinpath("fixtures"), "addons/local"
|
|
)
|
|
su_config.ADDONS_GIT = Path(
|
|
Path(__file__).parent.joinpath("fixtures"), "addons/git"
|
|
)
|
|
su_config.APPARMOR_DATA = Path(
|
|
Path(__file__).parent.joinpath("fixtures"), "apparmor"
|
|
)
|
|
|
|
# WebSocket
|
|
coresys_obj.homeassistant.api.get_api_state = AsyncMock(
|
|
return_value=APIState("RUNNING", False)
|
|
)
|
|
coresys_obj.homeassistant._websocket.client = AsyncMock(
|
|
ha_version=AwesomeVersion("2021.2.4")
|
|
)
|
|
|
|
if not request.node.get_closest_marker("no_mock_init_websession"):
|
|
coresys_obj.init_websession = AsyncMock()
|
|
|
|
# Don't remove files/folders related to addons and stores
|
|
with patch("supervisor.store.git.GitRepo.remove"):
|
|
yield coresys_obj
|
|
|
|
await coresys_obj.dbus.unload()
|
|
|
|
|
|
@pytest.fixture
|
|
async def ha_ws_client(coresys: CoreSys) -> AsyncMock:
|
|
"""Return HA WS client mock for assertions."""
|
|
# Set Supervisor Core state to RUNNING, otherwise WS events won't be delivered
|
|
await coresys.core.set_state(CoreState.RUNNING)
|
|
await asyncio.sleep(0)
|
|
client = coresys.homeassistant.websocket.client
|
|
client.async_send_command.reset_mock()
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
async def tmp_supervisor_data(coresys: CoreSys, tmp_path: Path) -> Path:
|
|
"""Patch supervisor data to be tmp_path."""
|
|
with patch.object(
|
|
su_config.CoreConfig, "path_supervisor", new=PropertyMock(return_value=tmp_path)
|
|
):
|
|
coresys.config.path_emergency.mkdir()
|
|
coresys.config.path_media.mkdir()
|
|
coresys.config.path_mounts.mkdir()
|
|
coresys.config.path_mounts_credentials.mkdir()
|
|
coresys.config.path_backup.mkdir()
|
|
coresys.config.path_tmp.mkdir()
|
|
coresys.config.path_homeassistant.mkdir()
|
|
coresys.config.path_audio.mkdir()
|
|
coresys.config.path_dns.mkdir()
|
|
coresys.config.path_share.mkdir()
|
|
coresys.config.path_addons_data.mkdir(parents=True)
|
|
coresys.config.path_addon_configs.mkdir(parents=True)
|
|
coresys.config.path_ssl.mkdir()
|
|
coresys.config.path_core_backup.mkdir(parents=True)
|
|
coresys.config.path_cid_files.mkdir()
|
|
yield tmp_path
|
|
|
|
|
|
@pytest.fixture
|
|
async def journald_gateway() -> AsyncGenerator[MagicMock]:
|
|
"""Mock logs control."""
|
|
with (
|
|
patch("supervisor.host.logs.Path.is_socket", return_value=True),
|
|
patch("supervisor.host.logs.ClientSession.get") as get,
|
|
):
|
|
reader = asyncio.StreamReader(loop=asyncio.get_running_loop())
|
|
client_response = MagicMock(content=reader, get=get)
|
|
|
|
async def response_text():
|
|
return (await client_response.content.read()).decode("utf-8")
|
|
|
|
client_response.text = response_text
|
|
client_response.status = 200
|
|
|
|
get.return_value.__aenter__.return_value = client_response
|
|
get.return_value.__aenter__.return_value.__aenter__.return_value = (
|
|
client_response
|
|
)
|
|
yield client_response
|
|
|
|
|
|
@pytest.fixture
|
|
async def without_journal_gatewayd_boots() -> AsyncGenerator[MagicMock]:
|
|
"""Make method using /boots of systemd-journald-gateway fail."""
|
|
|
|
def raise_host_log_error_side_effect(*args, **kwargs):
|
|
raise HostLogError("Mocked error")
|
|
|
|
with patch(
|
|
"supervisor.host.logs.LogsControl._get_boot_ids_native"
|
|
) as get_boot_ids_native:
|
|
get_boot_ids_native.side_effect = raise_host_log_error_side_effect
|
|
yield get_boot_ids_native
|
|
|
|
|
|
@pytest.fixture
|
|
async def journal_logs_reader() -> MagicMock:
|
|
"""Mock journal_logs_reader in host API."""
|
|
with patch("supervisor.api.host.journal_logs_reader") as reader:
|
|
yield reader
|
|
|
|
|
|
@pytest.fixture
|
|
def sys_machine():
|
|
"""Mock sys_machine."""
|
|
with patch("supervisor.coresys.CoreSys.machine", new_callable=PropertyMock) as mock:
|
|
yield mock
|
|
|
|
|
|
@pytest.fixture
|
|
def sys_supervisor():
|
|
"""Mock sys_supervisor."""
|
|
with patch(
|
|
"supervisor.coresys.CoreSys.supervisor", new_callable=PropertyMock
|
|
) as mock:
|
|
mock.return_value = MagicMock()
|
|
yield MagicMock
|
|
|
|
|
|
@pytest.fixture
|
|
async def api_client(
|
|
aiohttp_client,
|
|
coresys: CoreSys,
|
|
request: pytest.FixtureRequest,
|
|
) -> TestClient:
|
|
"""Fixture for RestAPI client."""
|
|
|
|
request_from: str | None = getattr(request, "param", None)
|
|
|
|
@web.middleware
|
|
async def _security_middleware(request: web.Request, handler: web.RequestHandler):
|
|
"""Make request are from Core or specified add-on."""
|
|
if request_from:
|
|
request[REQUEST_FROM] = coresys.addons.get(request_from, local_only=True)
|
|
else:
|
|
request[REQUEST_FROM] = coresys.homeassistant
|
|
|
|
return await handler(request)
|
|
|
|
api = RestAPI(coresys)
|
|
api.webapp = web.Application(middlewares=[_security_middleware])
|
|
api.start = AsyncMock()
|
|
await api.load()
|
|
yield await aiohttp_client(api.webapp)
|
|
|
|
|
|
@pytest.fixture
|
|
def supervisor_internet(coresys: CoreSys) -> Generator[AsyncMock]:
|
|
"""Fixture which simluate Supervsior internet connection."""
|
|
connectivity_check = AsyncMock(return_value=True)
|
|
coresys.supervisor.check_connectivity = connectivity_check
|
|
yield connectivity_check
|
|
|
|
|
|
@pytest.fixture
|
|
def websession(coresys: CoreSys) -> Generator[MagicMock]:
|
|
"""Fixture for global aiohttp SessionClient."""
|
|
coresys._websession = MagicMock(spec_set=ClientSession)
|
|
yield coresys._websession
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_update_data(websession: MagicMock) -> Generator[MockResponse]:
|
|
"""Mock updater JSON data."""
|
|
version_data = load_fixture("version_stable.json")
|
|
client_response = MockResponse(text=version_data)
|
|
client_response.status = 200
|
|
websession.get = MagicMock(return_value=client_response)
|
|
yield client_response
|
|
|
|
|
|
@pytest.fixture
|
|
def store_manager(coresys: CoreSys):
|
|
"""Fixture for the store manager."""
|
|
sm_obj = coresys.store
|
|
with patch("supervisor.store.data.StoreData.update", return_value=MagicMock()):
|
|
yield sm_obj
|
|
|
|
|
|
@pytest.fixture
|
|
def run_supervisor_state(request: pytest.FixtureRequest) -> Generator[MagicMock]:
|
|
"""Fixture to simulate Supervisor state file in /run/supervisor."""
|
|
if getattr(request, "param", "test_file"):
|
|
with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run:
|
|
yield mock_run
|
|
else:
|
|
with patch("supervisor.core.Core._write_run_state") as mock_write_state:
|
|
yield mock_write_state
|
|
|
|
|
|
@pytest.fixture
|
|
def store_addon(coresys: CoreSys, tmp_path, test_repository):
|
|
"""Store add-on fixture."""
|
|
addon_obj = AddonStore(coresys, "test_store_addon")
|
|
|
|
coresys.addons.store[addon_obj.slug] = addon_obj
|
|
coresys.store.data.addons[addon_obj.slug] = SCHEMA_ADDON_SYSTEM(
|
|
load_json_fixture("add-on.json")
|
|
)
|
|
coresys.store.data.addons[addon_obj.slug]["location"] = tmp_path
|
|
yield addon_obj
|
|
|
|
|
|
@pytest.fixture
|
|
async def test_repository(coresys: CoreSys):
|
|
"""Test add-on store repository fixture."""
|
|
coresys.config._data[ATTR_ADDONS_CUSTOM_LIST] = []
|
|
|
|
with (
|
|
patch("supervisor.store.git.GitRepo.load", return_value=None),
|
|
):
|
|
await coresys.store.load()
|
|
|
|
repository_obj = Repository.create(
|
|
coresys, "https://github.com/awesome-developer/awesome-repo"
|
|
)
|
|
|
|
coresys.store.repositories[repository_obj.slug] = repository_obj
|
|
coresys.store._data[ATTR_REPOSITORIES].append(
|
|
"https://github.com/awesome-developer/awesome-repo"
|
|
)
|
|
|
|
yield repository_obj
|
|
|
|
|
|
@pytest.fixture
|
|
async def install_addon_ssh(coresys: CoreSys, test_repository):
|
|
"""Install local_ssh add-on."""
|
|
store = coresys.addons.store[TEST_ADDON_SLUG]
|
|
await coresys.addons.data.install(store)
|
|
coresys.addons.data._data = coresys.addons.data._schema(coresys.addons.data._data)
|
|
|
|
addon = Addon(coresys, store.slug)
|
|
coresys.addons.local[addon.slug] = addon
|
|
yield addon
|
|
|
|
|
|
@pytest.fixture
|
|
async def install_addon_example(coresys: CoreSys, test_repository):
|
|
"""Install local_example add-on."""
|
|
store = coresys.addons.store["local_example"]
|
|
await coresys.addons.data.install(store)
|
|
coresys.addons.data._data = coresys.addons.data._schema(coresys.addons.data._data)
|
|
|
|
addon = Addon(coresys, store.slug)
|
|
coresys.addons.local[addon.slug] = addon
|
|
yield addon
|
|
|
|
|
|
@pytest.fixture
|
|
async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup:
|
|
"""Mock a full backup."""
|
|
mock_backup = Backup(
|
|
coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240
|
|
)
|
|
mock_backup.new("Test", utcnow().isoformat(), BackupType.FULL)
|
|
mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"]
|
|
mock_backup.docker = {}
|
|
mock_backup._data[ATTR_ADDONS] = [
|
|
{
|
|
ATTR_SLUG: "local_ssh",
|
|
ATTR_NAME: "SSH",
|
|
ATTR_VERSION: "1.0.0",
|
|
ATTR_SIZE: 0,
|
|
}
|
|
]
|
|
mock_backup._data[ATTR_FOLDERS] = ALL_FOLDERS
|
|
mock_backup._data[ATTR_HOMEASSISTANT] = {
|
|
ATTR_VERSION: AwesomeVersion("2022.8.0"),
|
|
ATTR_SIZE: 0,
|
|
ATTR_EXCLUDE_DATABASE: False,
|
|
}
|
|
coresys.backups._backups = {"test": mock_backup}
|
|
yield mock_backup
|
|
|
|
|
|
@pytest.fixture
|
|
async def mock_partial_backup(coresys: CoreSys, tmp_path) -> Backup:
|
|
"""Mock a partial backup."""
|
|
mock_backup = Backup(
|
|
coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240
|
|
)
|
|
mock_backup.new("Test", utcnow().isoformat(), BackupType.PARTIAL)
|
|
mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"]
|
|
mock_backup.docker = {}
|
|
mock_backup._data[ATTR_ADDONS] = [
|
|
{
|
|
ATTR_SLUG: "local_ssh",
|
|
ATTR_NAME: "SSH",
|
|
ATTR_VERSION: "1.0.0",
|
|
ATTR_SIZE: 0,
|
|
}
|
|
]
|
|
mock_backup._data[ATTR_FOLDERS] = ALL_FOLDERS
|
|
mock_backup._data[ATTR_HOMEASSISTANT] = {
|
|
ATTR_VERSION: AwesomeVersion("2022.8.0"),
|
|
ATTR_SIZE: 0,
|
|
ATTR_EXCLUDE_DATABASE: False,
|
|
}
|
|
coresys.backups._backups = {"test": mock_backup}
|
|
yield mock_backup
|
|
|
|
|
|
@pytest.fixture
|
|
async def backups(
|
|
coresys: CoreSys, tmp_path, request: pytest.FixtureRequest
|
|
) -> list[Backup]:
|
|
"""Create and return mock backups."""
|
|
for i in range(request.param if hasattr(request, "param") else 5):
|
|
slug = f"sn{i + 1}"
|
|
temp_tar = Path(tmp_path, f"{slug}.tar")
|
|
with SecureTarArchive(temp_tar, "w"):
|
|
pass
|
|
backup = Backup(coresys, temp_tar, slug, None)
|
|
backup._data = { # pylint: disable=protected-access
|
|
ATTR_SLUG: slug,
|
|
ATTR_DATE: utcnow().isoformat(),
|
|
ATTR_TYPE: BackupType.PARTIAL
|
|
if slug[-1] == "1" or slug[-1] == "5"
|
|
else BackupType.FULL,
|
|
}
|
|
coresys.backups._backups[backup.slug] = backup
|
|
|
|
yield coresys.backups.list_backups
|
|
|
|
|
|
@pytest.fixture
|
|
async def journald_logs(coresys: CoreSys) -> MagicMock:
|
|
"""Mock journald logs and make it available."""
|
|
with (
|
|
patch.object(LogsControl, "available", new=PropertyMock(return_value=True)),
|
|
patch.object(LogsControl, "get_boot_ids", return_value=["aaa", "bbb", "ccc"]),
|
|
patch.object(
|
|
LogsControl,
|
|
"get_identifiers",
|
|
return_value=["hassio_supervisor", "hassos-config", "kernel"],
|
|
),
|
|
patch.object(LogsControl, "journald_logs", new=MagicMock()) as logs,
|
|
):
|
|
await coresys.host.logs.load()
|
|
yield logs
|
|
|
|
|
|
@pytest.fixture
|
|
async def docker_logs(container: DockerContainer, supervisor_name) -> AsyncMock:
|
|
"""Mock log output for a container from docker."""
|
|
logs = load_fixture("logs_docker_container.txt")
|
|
container.log.return_value = logs.splitlines()
|
|
yield container.log
|
|
|
|
|
|
@pytest.fixture
|
|
async def capture_exception() -> Mock:
|
|
"""Mock capture exception method for testing."""
|
|
with (
|
|
patch("supervisor.utils.sentry.sentry_sdk.is_initialized", return_value=True),
|
|
patch(
|
|
"supervisor.utils.sentry.sentry_sdk.capture_exception"
|
|
) as capture_exception,
|
|
):
|
|
yield capture_exception
|
|
|
|
|
|
@pytest.fixture
|
|
async def os_available(request: pytest.FixtureRequest) -> None:
|
|
"""Mock os as available."""
|
|
version = (
|
|
AwesomeVersion(request.param)
|
|
if hasattr(request, "param")
|
|
else AwesomeVersion("16.2")
|
|
)
|
|
with (
|
|
patch.object(OSManager, "available", new=PropertyMock(return_value=True)),
|
|
patch.object(OSManager, "version", new=PropertyMock(return_value=version)),
|
|
):
|
|
yield
|
|
|
|
|
|
def create_mock_exec_stream(output: bytes = b"") -> AsyncMock:
|
|
"""Create a mock stream for exec with detach=False."""
|
|
stream = AsyncMock()
|
|
# Set up async context manager
|
|
stream.__aenter__.return_value = stream
|
|
stream.__aexit__.return_value = None
|
|
# Set up read_out to return messages then None (EOF)
|
|
if output:
|
|
Message = type("Message", (), {"data": output})
|
|
stream.read_out.side_effect = [Message(), None]
|
|
else:
|
|
stream.read_out.return_value = None
|
|
return stream
|
|
|
|
|
|
@pytest.fixture
|
|
async def container(docker: DockerAPI) -> DockerContainer:
|
|
"""Mock attrs and status for container on attach."""
|
|
container_mock = docker.containers.get.return_value
|
|
|
|
# Set up exec mock to return a mock stream
|
|
# Note: This must be a regular function, not async, to match aiodocker's start() behavior
|
|
def mock_exec_start(detach=False):
|
|
if detach:
|
|
# Old behavior for detach=True (shouldn't be used anymore)
|
|
# Would need to return an awaitable
|
|
async def _async_start():
|
|
return b""
|
|
|
|
return _async_start()
|
|
# Return mock stream for detach=False (synchronous)
|
|
return create_mock_exec_stream(output=b"")
|
|
|
|
# Replace the mock's start method with a MagicMock (not AsyncMock)
|
|
start_mock = MagicMock(side_effect=mock_exec_start)
|
|
container_mock.exec.return_value.start = start_mock
|
|
|
|
# Store the original side_effect so tests can override it but we can restore if needed
|
|
start_mock._original_side_effect = mock_exec_start
|
|
|
|
container_mock.exec.return_value.inspect.return_value = {
|
|
"Running": False,
|
|
"ExitCode": 0,
|
|
"Pid": 12345,
|
|
}
|
|
|
|
yield container_mock
|
|
|
|
|
|
@pytest.fixture
|
|
async def mount_propagation(container: DockerContainer, coresys: CoreSys) -> None:
|
|
"""Mock supervisor connected to container with propagation set."""
|
|
container.show.return_value["Mounts"] = [
|
|
{
|
|
"Type": "bind",
|
|
"Source": "/mnt/data/supervisor",
|
|
"Destination": "/data",
|
|
"Mode": "rw",
|
|
"RW": True,
|
|
"Propagation": "slave",
|
|
}
|
|
]
|
|
await coresys.supervisor.load()
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_amd64_arch_supported(coresys: CoreSys) -> None:
|
|
"""Mock amd64 arch as supported."""
|
|
coresys.arch._supported_arch = [CpuArch.AMD64]
|
|
coresys.arch._supported_set = {CpuArch.AMD64}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_aarch64_arch_supported(coresys: CoreSys) -> None:
|
|
"""Mock aarch64 arch as supported."""
|
|
coresys.arch._supported_arch = [CpuArch.AMD64]
|
|
coresys.arch._supported_set = {CpuArch.AMD64}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_is_mount() -> MagicMock:
|
|
"""Mock is_mount in mounts."""
|
|
with patch("supervisor.mounts.mount.Path.is_mount", return_value=True) as is_mount:
|
|
yield is_mount
|
|
|
|
|
|
@pytest.fixture
|
|
def no_job_throttle():
|
|
"""Remove job throttle for tests."""
|
|
with patch("supervisor.jobs.decorator.Job.last_call", return_value=datetime.min):
|
|
yield
|