1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-07-02 11:25:37 +01:00
Files
supervisor/tests/api/test_supervisor.py
Stefan Agner 9f553c327c Only force a versioned Supervisor update in DEV mode (#6903)
Home Assistant Core now triggers a versionless Supervisor update during
onboarding to ensure Supervisor is current before it continues setup. It
treats a "no update available" response as the signal to proceed.

In DEV mode the update endpoint bypassed the need_update check entirely
and resolved a versionless request to the latest published version. So
Core's onboarding call made a freshly-built DEV Supervisor install the
latest published build and restart, which breaks the run_supervisor CI
job (the API disappears mid-test with "connection refused").

Tie the bypass to an explicit version instead: specifying a version is
still DEV-only, but a versionless request now always respects
need_update. Since need_update is always False in DEV, Core's onboarding
call becomes a no-op there, avoiding the update to the latest published
Supervisor on the dev channel.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 13:26:22 +02:00

618 lines
21 KiB
Python

"""Test Supervisor API."""
# pylint: disable=protected-access
import time
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import aiodocker
from aiodocker.containers import DockerContainer
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
from blockbuster import BlockingError
import pytest
from supervisor.const import CoreState, FeatureFlag
from supervisor.core import Core
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassioError, HostNotSupportedError, StoreGitError
from supervisor.homeassistant.const import WSEvent
from supervisor.store.repository import Repository
from supervisor.supervisor import Supervisor
from supervisor.updater import Updater
from tests.common import AsyncIterator, load_json_fixture
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
REPO_URL = "https://github.com/awesome-developer/awesome-repo"
async def test_api_supervisor_options_debug(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test security options force security."""
api_client, prefix = api_client_with_prefix
assert not coresys.config.debug
await api_client.post(f"{prefix}/supervisor/options", json={"debug": True})
assert coresys.config.debug
async def test_api_supervisor_options_add_repository(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
supervisor_internet: AsyncMock,
):
"""Test add a repository via POST /supervisor/options REST API."""
api_client, prefix = api_client_with_prefix
assert REPO_URL not in coresys.store.repository_urls
with (
patch("supervisor.store.repository.RepositoryGit.load", return_value=None),
patch("supervisor.store.repository.RepositoryGit.validate", return_value=True),
):
response = await api_client.post(
f"{prefix}/supervisor/options", json={"addons_repositories": [REPO_URL]}
)
assert response.status == 200
assert REPO_URL in coresys.store.repository_urls
async def test_api_supervisor_options_remove_repository(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
test_repository: Repository,
):
"""Test remove a repository via POST /supervisor/options REST API."""
api_client, prefix = api_client_with_prefix
assert test_repository.source in coresys.store.repository_urls
assert test_repository.slug in coresys.store.repositories
response = await api_client.post(
f"{prefix}/supervisor/options", json={"addons_repositories": []}
)
assert response.status == 200
assert test_repository.source not in coresys.store.repository_urls
assert test_repository.slug not in coresys.store.repositories
@pytest.mark.parametrize("git_error", [None, StoreGitError()])
async def test_api_supervisor_options_repositories_skipped_on_error(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
git_error: StoreGitError,
):
"""Test repositories skipped on error via POST /supervisor/options REST API."""
api_client, prefix = api_client_with_prefix
with (
patch("supervisor.store.repository.RepositoryGit.load", side_effect=git_error),
patch("supervisor.store.repository.RepositoryGit.validate", return_value=False),
patch("supervisor.store.repository.RepositoryCustom.remove"),
):
response = await api_client.post(
f"{prefix}/supervisor/options", json={"addons_repositories": [REPO_URL]}
)
assert response.status == 400
assert len(coresys.resolution.suggestions) == 0
assert REPO_URL not in coresys.store.repository_urls
async def test_api_supervisor_options_repo_error_with_config_change(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test config change with add repository error via POST /supervisor/options REST API."""
api_client, prefix = api_client_with_prefix
assert not coresys.config.debug
with patch(
"supervisor.store.repository.RepositoryGit.load", side_effect=StoreGitError()
):
response = await api_client.post(
f"{prefix}/supervisor/options",
json={"debug": True, "addons_repositories": [REPO_URL]},
)
assert response.status == 400
assert REPO_URL not in coresys.store.repository_urls
assert coresys.config.debug
coresys.updater.save_data.assert_called_once()
coresys.config.save_data.assert_called_once()
async def test_api_supervisor_options_auto_update(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test disabling auto update via api."""
api_client, prefix = api_client_with_prefix
assert coresys.updater.auto_update is True
response = await api_client.post(
f"{prefix}/supervisor/options", json={"auto_update": False}
)
assert response.status == 200
assert coresys.updater.auto_update is False
async def test_api_supervisor_options_diagnostics(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
os_agent_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):
"""Test changing diagnostics."""
api_client, prefix = api_client_with_prefix
os_agent_service: OSAgentService = os_agent_services["os_agent"]
os_agent_service.Diagnostics = False
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is False
with patch("supervisor.utils.sentry.sentry_sdk.init") as sentry_init:
response = await api_client.post(
f"{prefix}/supervisor/options", json={"diagnostics": True}
)
assert response.status == 200
sentry_init.assert_called_once()
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is True
with patch("supervisor.api.supervisor.close_sentry") as close_sentry:
response = await api_client.post(
f"{prefix}/supervisor/options", json={"diagnostics": False}
)
assert response.status == 200
close_sentry.assert_called_once()
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is False
async def test_api_supervisor_logs(advanced_logs_tester):
"""Test supervisor logs."""
await advanced_logs_tester("/supervisor", "hassio_supervisor")
async def test_api_supervisor_fallback(
api_client_with_prefix: tuple[TestClient, str],
journald_logs: MagicMock,
docker_logs: MagicMock,
):
"""Check that supervisor logs read from container logs if reading from journald gateway fails badly."""
api_client, prefix = api_client_with_prefix
journald_logs.side_effect = HassioError("Something bad happened!")
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get(f"{prefix}/supervisor/logs")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
content = await resp.read()
assert content.split(b"\n")[0:2] == [
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os\x1b[0m",
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os/AppArmor\x1b[0m",
]
# check fallback also works for the /follow endpoint (no mock reset needed)
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get(f"{prefix}/supervisor/logs/follow")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
# check the /latest endpoint as well
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get(f"{prefix}/supervisor/logs/latest")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
# also check generic Python error
journald_logs.side_effect = OSError("Something bad happened!")
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get(f"{prefix}/supervisor/logs")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
async def test_api_supervisor_fallback_log_capture(
api_client_with_prefix: tuple[TestClient, str],
journald_logs: MagicMock,
docker_logs: MagicMock,
):
"""Check that Sentry log capture is executed only for unexpected errors."""
api_client, prefix = api_client_with_prefix
journald_logs.side_effect = HostNotSupportedError(
"No systemd-journal-gatewayd Unix socket available!"
)
with patch("supervisor.api.async_capture_exception") as capture_exception:
await api_client.get(f"{prefix}/supervisor/logs")
capture_exception.assert_not_called()
journald_logs.reset_mock()
journald_logs.side_effect = HassioError("Something bad happened!")
with patch("supervisor.api.async_capture_exception") as capture_exception:
await api_client.get(f"{prefix}/supervisor/logs")
capture_exception.assert_called_once()
async def test_api_supervisor_reload(
api_client_with_prefix: tuple[TestClient, str],
supervisor_internet: AsyncMock,
websession: MagicMock,
):
"""Test supervisor reload."""
api_client, prefix = api_client_with_prefix
resp = await api_client.post(f"{prefix}/supervisor/reload")
assert resp.status == 200
assert await resp.json() == {"result": "ok", "data": {}}
async def test_api_supervisor_options_timezone(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test setting supervisor timezone via API."""
api_client, prefix = api_client_with_prefix
assert coresys.timezone == "Etc/UTC"
resp = await api_client.post(
f"{prefix}/supervisor/options", json={"timezone": "Europe/Zurich"}
)
assert resp.status == 200
assert coresys.timezone == "Europe/Zurich"
async def test_api_supervisor_options_country(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test setting supervisor country via API."""
api_client, prefix = api_client_with_prefix
assert coresys.config.country is None
resp = await api_client.post(f"{prefix}/supervisor/options", json={"country": "CH"})
assert resp.status == 200
assert coresys.config.country == "CH"
resp = await api_client.get(f"{prefix}/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["country"] == "CH"
@pytest.mark.parametrize(
("blockbuster", "option_value", "config_value"),
[("no_blockbuster", "on", False), ("no_blockbuster", "on-at-startup", True)],
indirect=["blockbuster"],
)
async def test_api_supervisor_options_blocking_io(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
option_value: str,
config_value: bool,
):
"""Test setting supervisor detect blocking io option."""
api_client, prefix = api_client_with_prefix
# This should not fail with a blocking error yet
time.sleep(0)
resp = await api_client.post(
f"{prefix}/supervisor/options", json={"detect_blocking_io": option_value}
)
assert resp.status == 200
resp = await api_client.get(f"{prefix}/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["detect_blocking_io"] is True
# This remains false because we only turned it on for current run of supervisor, not permanently
assert coresys.config.detect_blocking_io is config_value
with pytest.raises(BlockingError):
time.sleep(0)
resp = await api_client.post(
f"{prefix}/supervisor/options", json={"detect_blocking_io": "off"}
)
assert resp.status == 200
resp = await api_client.get(f"{prefix}/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["detect_blocking_io"] is False
assert coresys.config.detect_blocking_io is False
# This should not raise blocking error anymore
time.sleep(0)
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_api_progress_updates_supervisor_update(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
ha_ws_client: AsyncMock,
):
"""Test progress updates sent to Home Assistant for updates."""
api_client, prefix = api_client_with_prefix
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.core.set_state(CoreState.RUNNING)
logs = load_json_fixture("docker_pull_image_log.json")
coresys.docker.images.pull.return_value = AsyncIterator(logs)
with (
patch.object(
Supervisor,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.08.0")),
),
patch.object(
Updater,
"version_supervisor",
new=PropertyMock(return_value=AwesomeVersion("2025.08.3")),
),
patch.object(
Updater, "image_supervisor", new=PropertyMock(return_value="supervisor")
),
patch.object(Supervisor, "update_apparmor"),
patch.object(Core, "stop"),
):
resp = await api_client.post(f"{prefix}/supervisor/update")
assert resp.status == 200
events = [
{
"stage": evt.args[0]["data"]["data"]["stage"],
"progress": evt.args[0]["data"]["data"]["progress"],
"done": evt.args[0]["data"]["data"]["done"],
}
for evt in ha_ws_client.async_send_command.call_args_list
if "data" in evt.args[0]
and evt.args[0]["data"]["event"] == WSEvent.JOB
and evt.args[0]["data"]["data"]["name"] == "supervisor_update"
]
# Count-based progress: 2 layers need pulling (each worth 50%)
# Layers that already exist are excluded from progress calculation
assert events[:4] == [
{
"stage": None,
"progress": 0,
"done": False,
},
{
"stage": None,
"progress": 9.2,
"done": False,
},
{
"stage": None,
"progress": 25.6,
"done": False,
},
{
"stage": None,
"progress": 35.4,
"done": False,
},
]
assert events[-5:] == [
{
"stage": None,
"progress": 95.5,
"done": False,
},
{
"stage": None,
"progress": 96.9,
"done": False,
},
{
"stage": None,
"progress": 98.2,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": False,
},
{
"stage": None,
"progress": 100,
"done": True,
},
]
@pytest.mark.parametrize("dev", [True, False])
async def test_api_supervisor_update_no_update_available(
api_client_with_prefix: tuple[TestClient, str],
dev: bool,
):
"""Test versionless update is a no-op when no update is available.
Home Assistant Core triggers a versionless Supervisor update during
onboarding. Without an available update (always the case in DEV) this
must report no update rather than installing the latest published version.
"""
api_client, prefix = api_client_with_prefix
with (
patch.object(CoreSys, "dev", new=PropertyMock(return_value=dev)),
patch.object(Supervisor, "need_update", new=PropertyMock(return_value=False)),
patch.object(Supervisor, "update") as update,
):
resp = await api_client.post(f"{prefix}/supervisor/update")
assert resp.status == 400
result = await resp.json()
assert "No supervisor update available" in result["message"]
update.assert_not_called()
async def test_api_supervisor_update_dev_explicit_version(
api_client_with_prefix: tuple[TestClient, str],
):
"""Test an explicit version can be forced in DEV without an update available."""
api_client, prefix = api_client_with_prefix
with (
patch.object(CoreSys, "dev", new=PropertyMock(return_value=True)),
patch.object(Supervisor, "need_update", new=PropertyMock(return_value=False)),
patch.object(Supervisor, "update") as update,
):
resp = await api_client.post(
f"{prefix}/supervisor/update", json={"version": "2025.08.3"}
)
assert resp.status == 200
update.assert_called_once_with(AwesomeVersion("2025.08.3"))
async def test_api_supervisor_stats(
api_client_with_prefix: tuple[TestClient, str], container: DockerContainer
):
"""Test supervisor stats."""
api_client, prefix = api_client_with_prefix
container.show.return_value["State"]["Status"] = "running"
container.show.return_value["State"]["Running"] = True
container.stats = AsyncMock(
return_value=[load_json_fixture("container_stats.json")]
)
resp = await api_client.get(f"{prefix}/supervisor/stats")
assert resp.status == 200
result = await resp.json()
assert result["data"]["cpu_percent"] == 90.0
assert result["data"]["memory_usage"] == 59700000
assert result["data"]["memory_limit"] == 4000000000
assert result["data"]["memory_percent"] == 1.49
async def test_supervisor_api_stats_failure(
api_client_with_prefix: tuple[TestClient, str],
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
):
"""Test supervisor stats failure."""
api_client, prefix = api_client_with_prefix
coresys.docker.containers.get.side_effect = aiodocker.DockerError(
500, {"message": "fail"}
)
resp = await api_client.get(f"{prefix}/supervisor/stats")
assert resp.status == 500
body = await resp.json()
assert (
body["message"]
== "An unknown error occurred with Supervisor. Check Supervisor logs for details"
)
assert body["error_key"] == "supervisor_unknown_error"
assert "extra_fields" not in body
assert (
"Could not inspect container 'hassio_supervisor': [500] {'message': 'fail'}"
in caplog.text
)
async def test_api_supervisor_info_feature_flags(
api_client: TestClient, coresys: CoreSys
):
"""Test that supervisor info returns all feature flags with default False."""
resp = await api_client.get("/supervisor/info")
assert resp.status == 200
result = await resp.json()
assert "feature_flags" in result["data"]
feature_flags = result["data"]["feature_flags"]
# All known feature flags should be present and default to False
for feature in FeatureFlag:
assert feature.value in feature_flags
assert feature_flags[feature.value] is False
async def test_api_supervisor_options_feature_flags_enable(
api_client: TestClient, coresys: CoreSys
):
"""Test enabling a feature flag via supervisor options."""
assert not coresys.config.feature_flags.get(FeatureFlag.SUPERVISOR_V2_API)
response = await api_client.post(
"/supervisor/options",
json={"feature_flags": {"supervisor_v2_api": True}},
)
assert response.status == 200
assert coresys.config.feature_flags.get(FeatureFlag.SUPERVISOR_V2_API) is True
async def test_api_supervisor_options_feature_flags_disable(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test disabling a feature flag via supervisor options."""
api_client, prefix = api_client_with_prefix
coresys.config.set_feature_flag(FeatureFlag.SUPERVISOR_V2_API, True)
assert coresys.config.feature_flags.get(FeatureFlag.SUPERVISOR_V2_API) is True
response = await api_client.post(
f"{prefix}/supervisor/options",
json={"feature_flags": {"supervisor_v2_api": False}},
)
assert response.status == 200
assert coresys.config.feature_flags.get(FeatureFlag.SUPERVISOR_V2_API) is False
async def test_api_supervisor_options_feature_flags_partial_update(
api_client_with_prefix: tuple[TestClient, str], coresys: CoreSys
):
"""Test that omitting a feature flag in options leaves its state unchanged."""
api_client, prefix = api_client_with_prefix
coresys.config.set_feature_flag(FeatureFlag.SUPERVISOR_V2_API, True)
# Post options without mentioning feature_flags at all
response = await api_client.post(
f"{prefix}/supervisor/options", json={"debug": False}
)
assert response.status == 200
# The feature flag should remain True
assert coresys.config.feature_flags.get(FeatureFlag.SUPERVISOR_V2_API) is True
async def test_api_supervisor_options_feature_flags_unknown_flag(
api_client_with_prefix: tuple[TestClient, str],
):
"""Test that an unknown feature flag name is rejected."""
api_client, prefix = api_client_with_prefix
response = await api_client.post(
f"{prefix}/supervisor/options",
json={"feature_flags": {"unknown_feature": True}},
)
assert response.status == 400