From 67ff4635eb4a38bc26a6856e076e7bb3f12ecad4 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 16 Mar 2026 13:36:22 +0100 Subject: [PATCH] Increase test coverage --- tests/host/test_manager.py | 98 +++++++++++++++++++++++++++++++++++++- tests/test_core.py | 65 +++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) diff --git a/tests/host/test_manager.py b/tests/host/test_manager.py index b5264dfb6..bdf23d4a0 100644 --- a/tests/host/test_manager.py +++ b/tests/host/test_manager.py @@ -1,7 +1,8 @@ """Test host manager.""" import asyncio -from unittest.mock import patch +import os +from unittest.mock import AsyncMock, PropertyMock, patch from awesomeversion import AwesomeVersion import pytest @@ -9,6 +10,7 @@ import pytest from supervisor.const import CoreState from supervisor.coresys import CoreSys from supervisor.dbus.const import MulticastProtocolEnabled +from supervisor.exceptions import DBusError from tests.dbus_service_mocks.base import DBusServiceMock from tests.dbus_service_mocks.logind import Logind as LogindService @@ -131,3 +133,97 @@ async def test_host_shutdown_signal_reentrant( # shutdown() is called reentrantly - it awaits the in-progress shutdown async with asyncio.timeout(2): await shutdown_called.wait() + + +async def test_host_unload_cancels_monitor_task( + coresys: CoreSys, logind_service: LogindService +): + """Test unload cancels the shutdown monitor task.""" + await coresys.host.load() + await asyncio.sleep(0.1) + + assert coresys.host._shutdown_monitor_task is not None + assert not coresys.host._shutdown_monitor_task.done() + + await coresys.host.unload() + + assert coresys.host._shutdown_monitor_task is None + + +async def test_host_unload_no_monitor_task(coresys: CoreSys): + """Test unload when no monitor task was started.""" + # Don't call load(), so no monitor task exists + assert coresys.host._shutdown_monitor_task is None + await coresys.host.unload() + assert coresys.host._shutdown_monitor_task is None + + +async def test_monitor_inhibit_lock_failure( + coresys: CoreSys, + logind_service: LogindService, + caplog: pytest.LogCaptureFixture, +): + """Test monitor task logs warning when inhibit lock fails.""" + with patch.object( + coresys.dbus.logind, "inhibit", side_effect=DBusError("test error") + ): + await coresys.host.load() + await asyncio.sleep(0.1) + + assert "Could not take shutdown inhibitor lock from logind" in caplog.text + + +async def test_monitor_dbus_error_during_signal_wait( + coresys: CoreSys, + logind_service: LogindService, + caplog: pytest.LogCaptureFixture, +): + """Test monitor task handles D-Bus errors during signal monitoring.""" + with patch.object( + coresys.dbus.logind, + "prepare_for_shutdown", + side_effect=DBusError("connection lost"), + ): + await coresys.host.load() + await asyncio.sleep(0.1) + + assert "Error monitoring host shutdown signal" in caplog.text + + +async def test_inhibitor_lock_released_after_shutdown( + coresys: CoreSys, + logind_service: LogindService, + caplog: pytest.LogCaptureFixture, +): + """Test that the inhibitor lock FD is closed after shutdown completes.""" + # Mock inhibit to return a real FD (session bus doesn't negotiate unix FDs) + r_fd, w_fd = os.pipe() + os.close(w_fd) + + with patch.object( + coresys.dbus.logind, "inhibit", new_callable=AsyncMock, return_value=r_fd + ): + await coresys.host.load() + await asyncio.sleep(0.1) + + await coresys.core.set_state(CoreState.RUNNING) + + with patch.object(coresys.core, "shutdown", new_callable=AsyncMock): + logind_service.PrepareForShutdown() + await logind_service.ping() + await asyncio.sleep(0.2) + + assert "Shutdown inhibitor lock released" in caplog.text + + +async def test_no_monitor_task_without_logind(coresys: CoreSys): + """Test no monitor task is started when logind is not connected.""" + with patch.object( + type(coresys.dbus.logind), + "is_connected", + new_callable=PropertyMock, + return_value=False, + ): + await coresys.host.load() + + assert coresys.host._shutdown_monitor_task is None diff --git a/tests/test_core.py b/tests/test_core.py index a393fe875..505574bf8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,6 +1,7 @@ """Testing handling with CoreState.""" # pylint: disable=W0212 +import asyncio import datetime import errno from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch @@ -146,3 +147,67 @@ async def test_write_state_failure( assert "Can't update the Supervisor state" in caplog.text assert coresys.core.state == CoreState.RUNNING + + +async def test_shutdown_reentrant_waits(coresys: CoreSys): + """Test that concurrent shutdown calls wait for the first to complete.""" + call_count = 0 + shutdown_started = asyncio.Event() + proceed = asyncio.Event() + + original_shutdown = coresys.addons.shutdown + + async def slow_addon_shutdown(startup): + nonlocal call_count + call_count += 1 + shutdown_started.set() + await proceed.wait() + return await original_shutdown(startup) + + await coresys.core.set_state(CoreState.RUNNING) + + with patch.object(coresys.addons, "shutdown", side_effect=slow_addon_shutdown): + # Start first shutdown + task1 = asyncio.create_task(coresys.core.shutdown()) + await shutdown_started.wait() + + # Second call should wait, not start a new shutdown + task2 = asyncio.create_task(coresys.core.shutdown()) + await asyncio.sleep(0.05) + + # Let the shutdown proceed + proceed.set() + + await asyncio.gather(task1, task2) + + # Addon shutdown was only called by the first shutdown (4 startup levels) + assert call_count == 4 + assert coresys.core._shutdown_event.is_set() + + +async def test_shutdown_event_reset_between_cycles(coresys: CoreSys): + """Test that shutdown event is reset for repeated shutdown cycles (e.g. backup restore).""" + await coresys.core.set_state(CoreState.FREEZE) + + # First shutdown cycle + await coresys.core.shutdown() + assert coresys.core._shutdown_event.is_set() + + # Simulate backup restore returning to RUNNING + await coresys.core.set_state(CoreState.RUNNING) + + # Second shutdown cycle should work (event was cleared) + second_entered = False + + original_shutdown = coresys.addons.shutdown + + async def track_addon_shutdown(startup): + nonlocal second_entered + second_entered = True + return await original_shutdown(startup) + + with patch.object(coresys.addons, "shutdown", side_effect=track_addon_shutdown): + await coresys.core.shutdown() + + assert second_entered + assert coresys.core._shutdown_event.is_set()