1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-04-02 00:07:16 +01:00
Files
supervisor/tests/host/test_firewall.py
Jan Čermák 0cb96c36b6 Disable hassio gateway protection rules in dev mode (#6658)
Since there's no Systemd guaranteed to be available for Devcontainer to create
the transient unit, we need to skip creating of the firewall rules when
Supervisor is started in dev mode.

See: https://github.com/home-assistant/supervisor/pull/6650#issuecomment-4097583552
2026-03-23 10:13:31 +01:00

241 lines
7.9 KiB
Python

"""Test host firewall manager."""
import asyncio
import os
from unittest.mock import patch
from dbus_fast import DBusError, ErrorType
import pytest
from supervisor.coresys import CoreSys
from supervisor.dbus.const import StartUnitMode
from supervisor.host.firewall import (
BIN_SH,
FIREWALL_SERVICE,
IP6TABLES_CMD,
IPTABLES_CMD,
FirewallManager,
)
from supervisor.resolution.const import UnhealthyReason
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitService
GATEWAY_IPV4 = "172.30.32.1"
GATEWAY_IPV6 = "fd0c:ac1e:2100::1"
@pytest.fixture(autouse=True)
def _unmock_firewall(_mock_firewall):
"""Undo the global firewall mock so actual firewall code runs."""
_mock_firewall.stop()
yield
_mock_firewall.start()
@pytest.fixture(name="systemd_service")
async def fixture_systemd_service(
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
) -> SystemdService:
"""Return systemd service mock."""
yield all_dbus_services["systemd"]
@pytest.fixture(name="systemd_unit_service")
async def fixture_systemd_unit_service(
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
) -> SystemdUnitService:
"""Return systemd unit service mock."""
yield all_dbus_services["systemd_unit"]
async def test_apply_gateway_firewall_rules(
coresys: CoreSys,
systemd_service: SystemdService,
systemd_unit_service: SystemdUnitService,
):
"""Test gateway firewall rules are applied."""
systemd_service.StartTransientUnit.calls.clear()
systemd_service.ResetFailedUnit.calls.clear()
systemd_unit_service.active_state = "inactive"
await coresys.host.firewall.apply_gateway_firewall_rules()
assert (
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED not in coresys.resolution.unhealthy
)
assert len(systemd_service.StartTransientUnit.calls) == 1
call = systemd_service.StartTransientUnit.calls[0]
assert call[0] == FIREWALL_SERVICE
assert call[1] == StartUnitMode.REPLACE
async def test_apply_gateway_firewall_rules_dev_mode(
coresys: CoreSys,
systemd_service: SystemdService,
):
"""Test gateway firewall rules are skipped in development mode."""
with patch.dict(os.environ, {"SUPERVISOR_DEV": "1"}):
systemd_service.StartTransientUnit.calls.clear()
systemd_service.ResetFailedUnit.calls.clear()
await coresys.host.firewall.apply_gateway_firewall_rules()
assert (
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED
not in coresys.resolution.unhealthy
)
assert len(systemd_service.StartTransientUnit.calls) == 0
assert len(systemd_service.ResetFailedUnit.calls) == 0
async def test_apply_gateway_firewall_rules_exec_start_rules(coresys: CoreSys):
"""Test correct iptables rules are generated."""
entries = FirewallManager._build_exec_start()
# 4 entries: 2 per IP version (DROP + ACCEPT)
assert len(entries) == 4
# IPv4 DROP rule
assert entries[0].binary == BIN_SH
assert entries[0].argv == [
BIN_SH,
"-c",
f"{IPTABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
f" -j DROP 2>/dev/null"
f" || {IPTABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
f" -j DROP",
]
assert entries[0].ignore_failure is False
# IPv4 ACCEPT rule
assert entries[1].binary == BIN_SH
assert entries[1].argv == [
BIN_SH,
"-c",
f"{IPTABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV4}"
f" -j ACCEPT 2>/dev/null"
f" || {IPTABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV4}"
f" -j ACCEPT",
]
assert entries[1].ignore_failure is False
# IPv6 DROP rule
assert entries[2].binary == BIN_SH
assert entries[2].argv == [
BIN_SH,
"-c",
f"{IP6TABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
f" -j DROP 2>/dev/null"
f" || {IP6TABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
f" -j DROP",
]
assert entries[2].ignore_failure is False
# IPv6 ACCEPT rule
assert entries[3].binary == BIN_SH
assert entries[3].argv == [
BIN_SH,
"-c",
f"{IP6TABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV6}"
f" -j ACCEPT 2>/dev/null"
f" || {IP6TABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV6}"
f" -j ACCEPT",
]
assert entries[3].ignore_failure is False
async def test_apply_gateway_firewall_rules_systemd_not_connected(
coresys: CoreSys, systemd_service: SystemdService
):
"""Test unsupported reason added when systemd is not available."""
systemd_service.StartTransientUnit.calls.clear()
with patch.object(
type(coresys.dbus.systemd),
"is_connected",
new_callable=lambda: property(lambda self: False),
):
await coresys.host.firewall.apply_gateway_firewall_rules()
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
assert len(systemd_service.StartTransientUnit.calls) == 0
async def test_apply_gateway_firewall_rules_dbus_error(
coresys: CoreSys, systemd_service: SystemdService
):
"""Test unsupported reason added when transient unit fails."""
systemd_service.response_start_transient_unit = DBusError(
ErrorType.SERVICE_ERROR, "test error"
)
await coresys.host.firewall.apply_gateway_firewall_rules()
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
async def test_apply_gateway_firewall_rules_resets_failed_unit(
coresys: CoreSys,
systemd_service: SystemdService,
systemd_unit_service: SystemdUnitService,
):
"""Test that previous failed unit is reset before applying."""
systemd_service.ResetFailedUnit.calls.clear()
systemd_unit_service.active_state = "inactive"
await coresys.host.firewall.apply_gateway_firewall_rules()
assert len(systemd_service.ResetFailedUnit.calls) == 1
assert systemd_service.ResetFailedUnit.calls[0] == (FIREWALL_SERVICE,)
async def test_apply_gateway_firewall_rules_properties(
coresys: CoreSys,
systemd_service: SystemdService,
systemd_unit_service: SystemdUnitService,
):
"""Test transient unit has correct properties."""
systemd_service.StartTransientUnit.calls.clear()
systemd_unit_service.active_state = "inactive"
await coresys.host.firewall.apply_gateway_firewall_rules()
call = systemd_service.StartTransientUnit.calls[0]
properties = {prop[0]: prop[1] for prop in call[2]}
assert properties["Description"].value == "Supervisor gateway firewall rules"
assert properties["Type"].value == "oneshot"
assert properties["ExecStart"].signature == "a(sasb)"
assert len(properties["ExecStart"].value) == 4
async def test_apply_gateway_firewall_rules_unit_failed(
coresys: CoreSys,
systemd_service: SystemdService,
systemd_unit_service: SystemdUnitService,
):
"""Test unsupported reason added when firewall unit fails."""
systemd_unit_service.active_state = "failed"
await coresys.host.firewall.apply_gateway_firewall_rules()
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
async def test_apply_gateway_firewall_rules_unit_failed_via_signal(
coresys: CoreSys,
systemd_service: SystemdService,
systemd_unit_service: SystemdUnitService,
):
"""Test failure detected via property change signal when unit is still activating."""
systemd_unit_service.active_state = "activating"
task = asyncio.create_task(coresys.host.firewall.apply_gateway_firewall_rules())
await asyncio.sleep(0.1)
systemd_unit_service.emit_properties_changed({"ActiveState": "failed"})
await task
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy