mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 00:07:16 +01:00
Add firewall rules to protect Docker gateway from external access (#6650)
Add iptables rules via a systemd transient unit to drop traffic addressed to the bridge gateway IP from non-bridge interfaces. The firewall manager waits for the transient unit to complete and verifies success via D-Bus property change signals. On failure, the system is marked unhealthy and host-network add-ons are prevented from booting. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -22,7 +22,7 @@ from ..exceptions import (
|
||||
from ..jobs import ChildJobSyncFilter
|
||||
from ..jobs.const import JobConcurrency
|
||||
from ..jobs.decorator import Job, JobCondition
|
||||
from ..resolution.const import ContextType, IssueType, SuggestionType
|
||||
from ..resolution.const import ContextType, IssueType, SuggestionType, UnhealthyReason
|
||||
from ..store.addon import AddonStore
|
||||
from ..utils.sentry import async_capture_exception
|
||||
from .addon import Addon
|
||||
@@ -110,6 +110,17 @@ class AddonManager(CoreSysAttributes):
|
||||
for addon in self.installed:
|
||||
if addon.boot != AddonBoot.AUTO or addon.startup != stage:
|
||||
continue
|
||||
if (
|
||||
addon.host_network
|
||||
and UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED
|
||||
in self.sys_resolution.unhealthy
|
||||
):
|
||||
_LOGGER.warning(
|
||||
"Skipping boot of add-on %s because gateway firewall"
|
||||
" rules are not active",
|
||||
addon.slug,
|
||||
)
|
||||
continue
|
||||
tasks.append(addon)
|
||||
|
||||
# Evaluate add-ons which need to be started
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from functools import wraps
|
||||
import logging
|
||||
from typing import NamedTuple
|
||||
|
||||
from dbus_fast import Variant
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
@@ -36,6 +37,14 @@ from .utils import dbus_connected
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExecStartEntry(NamedTuple):
|
||||
"""Systemd ExecStart entry for transient units (D-Bus type signature 'sasb')."""
|
||||
|
||||
binary: str
|
||||
argv: list[str]
|
||||
ignore_failure: bool
|
||||
|
||||
|
||||
def systemd_errors(func):
|
||||
"""Wrap systemd dbus methods to handle its specific error types."""
|
||||
|
||||
|
||||
161
supervisor/host/firewall.py
Normal file
161
supervisor/host/firewall.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Firewall rules for the Supervisor network gateway."""
|
||||
|
||||
import asyncio
|
||||
from contextlib import suppress
|
||||
import logging
|
||||
|
||||
from dbus_fast import Variant
|
||||
|
||||
from ..const import DOCKER_IPV4_NETWORK_MASK, DOCKER_IPV6_NETWORK_MASK, DOCKER_NETWORK
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..dbus.const import (
|
||||
DBUS_ATTR_ACTIVE_STATE,
|
||||
DBUS_IFACE_SYSTEMD_UNIT,
|
||||
StartUnitMode,
|
||||
UnitActiveState,
|
||||
)
|
||||
from ..dbus.systemd import ExecStartEntry
|
||||
from ..exceptions import DBusError
|
||||
from ..resolution.const import UnhealthyReason
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
FIREWALL_SERVICE = "supervisor-firewall-gateway.service"
|
||||
FIREWALL_UNIT_TIMEOUT = 30
|
||||
BIN_SH = "/bin/sh"
|
||||
IPTABLES_CMD = "/usr/sbin/iptables"
|
||||
IP6TABLES_CMD = "/usr/sbin/ip6tables"
|
||||
|
||||
TERMINAL_STATES = {UnitActiveState.INACTIVE, UnitActiveState.FAILED}
|
||||
|
||||
|
||||
class FirewallManager(CoreSysAttributes):
|
||||
"""Manage firewall rules to protect the Supervisor network gateway.
|
||||
|
||||
Adds iptables rules in the raw PREROUTING chain to drop traffic addressed
|
||||
to the bridge gateway IP that does not originate from the bridge or
|
||||
loopback interfaces.
|
||||
"""
|
||||
|
||||
def __init__(self, coresys: CoreSys) -> None:
|
||||
"""Initialize firewall manager."""
|
||||
self.coresys: CoreSys = coresys
|
||||
|
||||
@staticmethod
|
||||
def _build_exec_start() -> list[ExecStartEntry]:
|
||||
"""Build ExecStart entries for gateway firewall rules.
|
||||
|
||||
Each entry uses shell check-or-insert logic for idempotency.
|
||||
We insert DROP first, then ACCEPT, using -I (insert at top).
|
||||
The last inserted rule ends up first in the chain, so ACCEPT
|
||||
for loopback ends up above the DROP for non-bridge interfaces.
|
||||
"""
|
||||
gateway_ipv4 = str(DOCKER_IPV4_NETWORK_MASK[1])
|
||||
gateway_ipv6 = str(DOCKER_IPV6_NETWORK_MASK[1])
|
||||
bridge = DOCKER_NETWORK
|
||||
|
||||
entries: list[ExecStartEntry] = []
|
||||
for cmd, gateway in (
|
||||
(IPTABLES_CMD, gateway_ipv4),
|
||||
(IP6TABLES_CMD, gateway_ipv6),
|
||||
):
|
||||
# DROP packets to gateway from non-bridge, non-loopback interfaces
|
||||
entries.append(
|
||||
ExecStartEntry(
|
||||
binary=BIN_SH,
|
||||
argv=[
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{cmd} -t raw -C PREROUTING ! -i {bridge} -d {gateway}"
|
||||
f" -j DROP 2>/dev/null"
|
||||
f" || {cmd} -t raw -I PREROUTING ! -i {bridge} -d {gateway}"
|
||||
f" -j DROP",
|
||||
],
|
||||
ignore_failure=False,
|
||||
)
|
||||
)
|
||||
|
||||
# ACCEPT loopback traffic to gateway (inserted last, ends up first)
|
||||
entries.append(
|
||||
ExecStartEntry(
|
||||
binary=BIN_SH,
|
||||
argv=[
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{cmd} -t raw -C PREROUTING -i lo -d {gateway}"
|
||||
f" -j ACCEPT 2>/dev/null"
|
||||
f" || {cmd} -t raw -I PREROUTING -i lo -d {gateway}"
|
||||
f" -j ACCEPT",
|
||||
],
|
||||
ignore_failure=False,
|
||||
)
|
||||
)
|
||||
|
||||
return entries
|
||||
|
||||
async def _apply_gateway_firewall_rules(self) -> bool:
|
||||
"""Apply iptables rules to restrict access to the Docker gateway.
|
||||
|
||||
Returns True if the rules were successfully applied.
|
||||
"""
|
||||
if not self.sys_dbus.systemd.is_connected:
|
||||
_LOGGER.error("Systemd not available, cannot apply gateway firewall rules")
|
||||
return False
|
||||
|
||||
# Clean up any previous failed unit
|
||||
with suppress(DBusError):
|
||||
await self.sys_dbus.systemd.reset_failed_unit(FIREWALL_SERVICE)
|
||||
|
||||
properties: list[tuple[str, Variant]] = [
|
||||
("Description", Variant("s", "Supervisor gateway firewall rules")),
|
||||
("Type", Variant("s", "oneshot")),
|
||||
("ExecStart", Variant("a(sasb)", self._build_exec_start())),
|
||||
]
|
||||
|
||||
try:
|
||||
await self.sys_dbus.systemd.start_transient_unit(
|
||||
FIREWALL_SERVICE,
|
||||
StartUnitMode.REPLACE,
|
||||
properties,
|
||||
)
|
||||
except DBusError as err:
|
||||
_LOGGER.error("Failed to apply gateway firewall rules: %s", err)
|
||||
return False
|
||||
|
||||
# Wait for the oneshot unit to finish and verify it succeeded
|
||||
try:
|
||||
unit = await self.sys_dbus.systemd.get_unit(FIREWALL_SERVICE)
|
||||
async with (
|
||||
asyncio.timeout(FIREWALL_UNIT_TIMEOUT),
|
||||
unit.properties_changed() as signal,
|
||||
):
|
||||
state = await unit.get_active_state()
|
||||
while state not in TERMINAL_STATES:
|
||||
props = await signal.wait_for_signal()
|
||||
if (
|
||||
props[0] == DBUS_IFACE_SYSTEMD_UNIT
|
||||
and DBUS_ATTR_ACTIVE_STATE in props[1]
|
||||
):
|
||||
state = UnitActiveState(props[1][DBUS_ATTR_ACTIVE_STATE].value)
|
||||
except (DBusError, TimeoutError) as err:
|
||||
_LOGGER.error(
|
||||
"Failed waiting for gateway firewall unit to complete: %s", err
|
||||
)
|
||||
return False
|
||||
|
||||
if state == UnitActiveState.FAILED:
|
||||
_LOGGER.error(
|
||||
"Gateway firewall unit failed, iptables rules may not be applied"
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def apply_gateway_firewall_rules(self) -> None:
|
||||
"""Apply gateway firewall rules, marking unsupported on failure."""
|
||||
if await self._apply_gateway_firewall_rules():
|
||||
_LOGGER.info("Gateway firewall rules applied")
|
||||
else:
|
||||
self.sys_resolution.add_unhealthy_reason(
|
||||
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED
|
||||
)
|
||||
@@ -15,6 +15,7 @@ from ..hardware.data import Device
|
||||
from .apparmor import AppArmorControl
|
||||
from .const import HostFeature
|
||||
from .control import SystemControl
|
||||
from .firewall import FirewallManager
|
||||
from .info import InfoCenter
|
||||
from .logs import LogsControl
|
||||
from .network import NetworkManager
|
||||
@@ -33,6 +34,7 @@ class HostManager(CoreSysAttributes):
|
||||
|
||||
self._apparmor: AppArmorControl = AppArmorControl(coresys)
|
||||
self._control: SystemControl = SystemControl(coresys)
|
||||
self._firewall: FirewallManager = FirewallManager(coresys)
|
||||
self._info: InfoCenter = InfoCenter(coresys)
|
||||
self._services: ServiceManager = ServiceManager(coresys)
|
||||
self._network: NetworkManager = NetworkManager(coresys)
|
||||
@@ -54,6 +56,11 @@ class HostManager(CoreSysAttributes):
|
||||
"""Return host control handler."""
|
||||
return self._control
|
||||
|
||||
@property
|
||||
def firewall(self) -> FirewallManager:
|
||||
"""Return host firewall handler."""
|
||||
return self._firewall
|
||||
|
||||
@property
|
||||
def info(self) -> InfoCenter:
|
||||
"""Return host info handler."""
|
||||
@@ -168,6 +175,9 @@ class HostManager(CoreSysAttributes):
|
||||
|
||||
await self.network.load()
|
||||
|
||||
# Apply firewall rules to restrict access to the Docker gateway
|
||||
await self.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
# Register for events
|
||||
self.sys_bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events)
|
||||
self.sys_bus.register_event(
|
||||
|
||||
@@ -65,6 +65,7 @@ class UnhealthyReason(StrEnum):
|
||||
"""Reasons for unsupported status."""
|
||||
|
||||
DOCKER = "docker"
|
||||
DOCKER_GATEWAY_UNPROTECTED = "docker_gateway_unprotected"
|
||||
DUPLICATE_OS_INSTALLATION = "duplicate_os_installation"
|
||||
OSERROR_BAD_MESSAGE = "oserror_bad_message"
|
||||
PRIVILEGED = "privileged"
|
||||
|
||||
@@ -27,7 +27,12 @@ from supervisor.exceptions import (
|
||||
DockerNotFound,
|
||||
)
|
||||
from supervisor.plugins.dns import PluginDns
|
||||
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
|
||||
from supervisor.resolution.const import (
|
||||
ContextType,
|
||||
IssueType,
|
||||
SuggestionType,
|
||||
UnhealthyReason,
|
||||
)
|
||||
from supervisor.resolution.data import Issue, Suggestion
|
||||
from supervisor.store.addon import AddonStore
|
||||
from supervisor.store.repository import RepositoryLocal
|
||||
@@ -128,6 +133,41 @@ async def test_image_added_removed_on_update(
|
||||
install.assert_not_called()
|
||||
|
||||
|
||||
async def test_addon_boot_skip_host_network_gateway_unprotected(
|
||||
coresys: CoreSys, install_addon_ssh: Addon
|
||||
):
|
||||
"""Test host network add-ons are skipped when gateway is unprotected."""
|
||||
install_addon_ssh.boot = AddonBoot.AUTO
|
||||
coresys.resolution.add_unhealthy_reason(UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED)
|
||||
with (
|
||||
patch.object(
|
||||
type(install_addon_ssh), "host_network", new=PropertyMock(return_value=True)
|
||||
),
|
||||
patch.object(Addon, "start") as start,
|
||||
):
|
||||
await coresys.addons.boot(AddonStartup.APPLICATION)
|
||||
start.assert_not_called()
|
||||
|
||||
|
||||
async def test_addon_boot_host_network_gateway_protected(
|
||||
coresys: CoreSys, install_addon_ssh: Addon
|
||||
):
|
||||
"""Test host network add-ons boot normally when gateway is protected."""
|
||||
install_addon_ssh.boot = AddonBoot.AUTO
|
||||
assert (
|
||||
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED not in coresys.resolution.unhealthy
|
||||
)
|
||||
with (
|
||||
patch.object(
|
||||
type(install_addon_ssh), "host_network", new=PropertyMock(return_value=True)
|
||||
),
|
||||
patch.object(Addon, "start", return_value=asyncio.Future()) as start,
|
||||
):
|
||||
start.return_value.set_result(None)
|
||||
await coresys.addons.boot(AddonStartup.APPLICATION)
|
||||
start.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("err", [DockerAPIError, DockerNotFound])
|
||||
async def test_addon_boot_system_error(
|
||||
coresys: CoreSys, install_addon_ssh: Addon, capture_exception: Mock, err
|
||||
|
||||
@@ -473,6 +473,18 @@ async def fixture_all_dbus_services(
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _mock_firewall():
|
||||
"""Mock out firewall rules by default to avoid dbus signal timeouts."""
|
||||
patcher = patch(
|
||||
"supervisor.host.firewall.FirewallManager.apply_gateway_firewall_rules",
|
||||
new_callable=AsyncMock,
|
||||
)
|
||||
patcher.start()
|
||||
yield patcher
|
||||
patcher.stop()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def coresys(
|
||||
docker: DockerAPI,
|
||||
|
||||
220
tests/host/test_firewall.py
Normal file
220
tests/host/test_firewall.py
Normal file
@@ -0,0 +1,220 @@
|
||||
"""Test host firewall manager."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
from dbus_fast import DBusError, ErrorType
|
||||
import pytest
|
||||
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.dbus.const import StartUnitMode
|
||||
from supervisor.host.firewall import (
|
||||
BIN_SH,
|
||||
FIREWALL_SERVICE,
|
||||
IP6TABLES_CMD,
|
||||
IPTABLES_CMD,
|
||||
FirewallManager,
|
||||
)
|
||||
from supervisor.resolution.const import UnhealthyReason
|
||||
|
||||
from tests.dbus_service_mocks.base import DBusServiceMock
|
||||
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
|
||||
from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitService
|
||||
|
||||
GATEWAY_IPV4 = "172.30.32.1"
|
||||
GATEWAY_IPV6 = "fd0c:ac1e:2100::1"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _unmock_firewall(_mock_firewall):
|
||||
"""Undo the global firewall mock so actual firewall code runs."""
|
||||
_mock_firewall.stop()
|
||||
yield
|
||||
_mock_firewall.start()
|
||||
|
||||
|
||||
@pytest.fixture(name="systemd_service")
|
||||
async def fixture_systemd_service(
|
||||
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
||||
) -> SystemdService:
|
||||
"""Return systemd service mock."""
|
||||
yield all_dbus_services["systemd"]
|
||||
|
||||
|
||||
@pytest.fixture(name="systemd_unit_service")
|
||||
async def fixture_systemd_unit_service(
|
||||
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
||||
) -> SystemdUnitService:
|
||||
"""Return systemd unit service mock."""
|
||||
yield all_dbus_services["systemd_unit"]
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules(
|
||||
coresys: CoreSys,
|
||||
systemd_service: SystemdService,
|
||||
systemd_unit_service: SystemdUnitService,
|
||||
):
|
||||
"""Test gateway firewall rules are applied."""
|
||||
systemd_service.StartTransientUnit.calls.clear()
|
||||
systemd_service.ResetFailedUnit.calls.clear()
|
||||
systemd_unit_service.active_state = "inactive"
|
||||
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
assert (
|
||||
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED not in coresys.resolution.unhealthy
|
||||
)
|
||||
assert len(systemd_service.StartTransientUnit.calls) == 1
|
||||
call = systemd_service.StartTransientUnit.calls[0]
|
||||
assert call[0] == FIREWALL_SERVICE
|
||||
assert call[1] == StartUnitMode.REPLACE
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_exec_start_rules(coresys: CoreSys):
|
||||
"""Test correct iptables rules are generated."""
|
||||
entries = FirewallManager._build_exec_start()
|
||||
|
||||
# 4 entries: 2 per IP version (DROP + ACCEPT)
|
||||
assert len(entries) == 4
|
||||
|
||||
# IPv4 DROP rule
|
||||
assert entries[0].binary == BIN_SH
|
||||
assert entries[0].argv == [
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{IPTABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
|
||||
f" -j DROP 2>/dev/null"
|
||||
f" || {IPTABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
|
||||
f" -j DROP",
|
||||
]
|
||||
assert entries[0].ignore_failure is False
|
||||
|
||||
# IPv4 ACCEPT rule
|
||||
assert entries[1].binary == BIN_SH
|
||||
assert entries[1].argv == [
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{IPTABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV4}"
|
||||
f" -j ACCEPT 2>/dev/null"
|
||||
f" || {IPTABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV4}"
|
||||
f" -j ACCEPT",
|
||||
]
|
||||
assert entries[1].ignore_failure is False
|
||||
|
||||
# IPv6 DROP rule
|
||||
assert entries[2].binary == BIN_SH
|
||||
assert entries[2].argv == [
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{IP6TABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
|
||||
f" -j DROP 2>/dev/null"
|
||||
f" || {IP6TABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
|
||||
f" -j DROP",
|
||||
]
|
||||
assert entries[2].ignore_failure is False
|
||||
|
||||
# IPv6 ACCEPT rule
|
||||
assert entries[3].binary == BIN_SH
|
||||
assert entries[3].argv == [
|
||||
BIN_SH,
|
||||
"-c",
|
||||
f"{IP6TABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV6}"
|
||||
f" -j ACCEPT 2>/dev/null"
|
||||
f" || {IP6TABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV6}"
|
||||
f" -j ACCEPT",
|
||||
]
|
||||
assert entries[3].ignore_failure is False
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_systemd_not_connected(
|
||||
coresys: CoreSys, systemd_service: SystemdService
|
||||
):
|
||||
"""Test unsupported reason added when systemd is not available."""
|
||||
systemd_service.StartTransientUnit.calls.clear()
|
||||
|
||||
with patch.object(
|
||||
type(coresys.dbus.systemd),
|
||||
"is_connected",
|
||||
new_callable=lambda: property(lambda self: False),
|
||||
):
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
||||
assert len(systemd_service.StartTransientUnit.calls) == 0
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_dbus_error(
|
||||
coresys: CoreSys, systemd_service: SystemdService
|
||||
):
|
||||
"""Test unsupported reason added when transient unit fails."""
|
||||
systemd_service.response_start_transient_unit = DBusError(
|
||||
ErrorType.SERVICE_ERROR, "test error"
|
||||
)
|
||||
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_resets_failed_unit(
|
||||
coresys: CoreSys,
|
||||
systemd_service: SystemdService,
|
||||
systemd_unit_service: SystemdUnitService,
|
||||
):
|
||||
"""Test that previous failed unit is reset before applying."""
|
||||
systemd_service.ResetFailedUnit.calls.clear()
|
||||
systemd_unit_service.active_state = "inactive"
|
||||
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
assert len(systemd_service.ResetFailedUnit.calls) == 1
|
||||
assert systemd_service.ResetFailedUnit.calls[0] == (FIREWALL_SERVICE,)
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_properties(
|
||||
coresys: CoreSys,
|
||||
systemd_service: SystemdService,
|
||||
systemd_unit_service: SystemdUnitService,
|
||||
):
|
||||
"""Test transient unit has correct properties."""
|
||||
systemd_service.StartTransientUnit.calls.clear()
|
||||
systemd_unit_service.active_state = "inactive"
|
||||
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
call = systemd_service.StartTransientUnit.calls[0]
|
||||
properties = {prop[0]: prop[1] for prop in call[2]}
|
||||
|
||||
assert properties["Description"].value == "Supervisor gateway firewall rules"
|
||||
assert properties["Type"].value == "oneshot"
|
||||
assert properties["ExecStart"].signature == "a(sasb)"
|
||||
assert len(properties["ExecStart"].value) == 4
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_unit_failed(
|
||||
coresys: CoreSys,
|
||||
systemd_service: SystemdService,
|
||||
systemd_unit_service: SystemdUnitService,
|
||||
):
|
||||
"""Test unsupported reason added when firewall unit fails."""
|
||||
systemd_unit_service.active_state = "failed"
|
||||
|
||||
await coresys.host.firewall.apply_gateway_firewall_rules()
|
||||
|
||||
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
||||
|
||||
|
||||
async def test_apply_gateway_firewall_rules_unit_failed_via_signal(
|
||||
coresys: CoreSys,
|
||||
systemd_service: SystemdService,
|
||||
systemd_unit_service: SystemdUnitService,
|
||||
):
|
||||
"""Test failure detected via property change signal when unit is still activating."""
|
||||
systemd_unit_service.active_state = "activating"
|
||||
|
||||
task = asyncio.create_task(coresys.host.firewall.apply_gateway_firewall_rules())
|
||||
await asyncio.sleep(0.1)
|
||||
systemd_unit_service.emit_properties_changed({"ActiveState": "failed"})
|
||||
await task
|
||||
|
||||
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
||||
Reference in New Issue
Block a user