mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 08:12:47 +01:00
Since there's no Systemd guaranteed to be available for Devcontainer to create the transient unit, we need to skip creating of the firewall rules when Supervisor is started in dev mode. See: https://github.com/home-assistant/supervisor/pull/6650#issuecomment-4097583552
241 lines
7.9 KiB
Python
241 lines
7.9 KiB
Python
"""Test host firewall manager."""
|
|
|
|
import asyncio
|
|
import os
|
|
from unittest.mock import patch
|
|
|
|
from dbus_fast import DBusError, ErrorType
|
|
import pytest
|
|
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.dbus.const import StartUnitMode
|
|
from supervisor.host.firewall import (
|
|
BIN_SH,
|
|
FIREWALL_SERVICE,
|
|
IP6TABLES_CMD,
|
|
IPTABLES_CMD,
|
|
FirewallManager,
|
|
)
|
|
from supervisor.resolution.const import UnhealthyReason
|
|
|
|
from tests.dbus_service_mocks.base import DBusServiceMock
|
|
from tests.dbus_service_mocks.systemd import Systemd as SystemdService
|
|
from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitService
|
|
|
|
GATEWAY_IPV4 = "172.30.32.1"
|
|
GATEWAY_IPV6 = "fd0c:ac1e:2100::1"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _unmock_firewall(_mock_firewall):
|
|
"""Undo the global firewall mock so actual firewall code runs."""
|
|
_mock_firewall.stop()
|
|
yield
|
|
_mock_firewall.start()
|
|
|
|
|
|
@pytest.fixture(name="systemd_service")
|
|
async def fixture_systemd_service(
|
|
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> SystemdService:
|
|
"""Return systemd service mock."""
|
|
yield all_dbus_services["systemd"]
|
|
|
|
|
|
@pytest.fixture(name="systemd_unit_service")
|
|
async def fixture_systemd_unit_service(
|
|
all_dbus_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
) -> SystemdUnitService:
|
|
"""Return systemd unit service mock."""
|
|
yield all_dbus_services["systemd_unit"]
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
systemd_unit_service: SystemdUnitService,
|
|
):
|
|
"""Test gateway firewall rules are applied."""
|
|
systemd_service.StartTransientUnit.calls.clear()
|
|
systemd_service.ResetFailedUnit.calls.clear()
|
|
systemd_unit_service.active_state = "inactive"
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert (
|
|
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED not in coresys.resolution.unhealthy
|
|
)
|
|
assert len(systemd_service.StartTransientUnit.calls) == 1
|
|
call = systemd_service.StartTransientUnit.calls[0]
|
|
assert call[0] == FIREWALL_SERVICE
|
|
assert call[1] == StartUnitMode.REPLACE
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_dev_mode(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
):
|
|
"""Test gateway firewall rules are skipped in development mode."""
|
|
with patch.dict(os.environ, {"SUPERVISOR_DEV": "1"}):
|
|
systemd_service.StartTransientUnit.calls.clear()
|
|
systemd_service.ResetFailedUnit.calls.clear()
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert (
|
|
UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED
|
|
not in coresys.resolution.unhealthy
|
|
)
|
|
assert len(systemd_service.StartTransientUnit.calls) == 0
|
|
assert len(systemd_service.ResetFailedUnit.calls) == 0
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_exec_start_rules(coresys: CoreSys):
|
|
"""Test correct iptables rules are generated."""
|
|
entries = FirewallManager._build_exec_start()
|
|
|
|
# 4 entries: 2 per IP version (DROP + ACCEPT)
|
|
assert len(entries) == 4
|
|
|
|
# IPv4 DROP rule
|
|
assert entries[0].binary == BIN_SH
|
|
assert entries[0].argv == [
|
|
BIN_SH,
|
|
"-c",
|
|
f"{IPTABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
|
|
f" -j DROP 2>/dev/null"
|
|
f" || {IPTABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV4}"
|
|
f" -j DROP",
|
|
]
|
|
assert entries[0].ignore_failure is False
|
|
|
|
# IPv4 ACCEPT rule
|
|
assert entries[1].binary == BIN_SH
|
|
assert entries[1].argv == [
|
|
BIN_SH,
|
|
"-c",
|
|
f"{IPTABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV4}"
|
|
f" -j ACCEPT 2>/dev/null"
|
|
f" || {IPTABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV4}"
|
|
f" -j ACCEPT",
|
|
]
|
|
assert entries[1].ignore_failure is False
|
|
|
|
# IPv6 DROP rule
|
|
assert entries[2].binary == BIN_SH
|
|
assert entries[2].argv == [
|
|
BIN_SH,
|
|
"-c",
|
|
f"{IP6TABLES_CMD} -t raw -C PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
|
|
f" -j DROP 2>/dev/null"
|
|
f" || {IP6TABLES_CMD} -t raw -I PREROUTING ! -i hassio -d {GATEWAY_IPV6}"
|
|
f" -j DROP",
|
|
]
|
|
assert entries[2].ignore_failure is False
|
|
|
|
# IPv6 ACCEPT rule
|
|
assert entries[3].binary == BIN_SH
|
|
assert entries[3].argv == [
|
|
BIN_SH,
|
|
"-c",
|
|
f"{IP6TABLES_CMD} -t raw -C PREROUTING -i lo -d {GATEWAY_IPV6}"
|
|
f" -j ACCEPT 2>/dev/null"
|
|
f" || {IP6TABLES_CMD} -t raw -I PREROUTING -i lo -d {GATEWAY_IPV6}"
|
|
f" -j ACCEPT",
|
|
]
|
|
assert entries[3].ignore_failure is False
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_systemd_not_connected(
|
|
coresys: CoreSys, systemd_service: SystemdService
|
|
):
|
|
"""Test unsupported reason added when systemd is not available."""
|
|
systemd_service.StartTransientUnit.calls.clear()
|
|
|
|
with patch.object(
|
|
type(coresys.dbus.systemd),
|
|
"is_connected",
|
|
new_callable=lambda: property(lambda self: False),
|
|
):
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
|
assert len(systemd_service.StartTransientUnit.calls) == 0
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_dbus_error(
|
|
coresys: CoreSys, systemd_service: SystemdService
|
|
):
|
|
"""Test unsupported reason added when transient unit fails."""
|
|
systemd_service.response_start_transient_unit = DBusError(
|
|
ErrorType.SERVICE_ERROR, "test error"
|
|
)
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_resets_failed_unit(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
systemd_unit_service: SystemdUnitService,
|
|
):
|
|
"""Test that previous failed unit is reset before applying."""
|
|
systemd_service.ResetFailedUnit.calls.clear()
|
|
systemd_unit_service.active_state = "inactive"
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert len(systemd_service.ResetFailedUnit.calls) == 1
|
|
assert systemd_service.ResetFailedUnit.calls[0] == (FIREWALL_SERVICE,)
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_properties(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
systemd_unit_service: SystemdUnitService,
|
|
):
|
|
"""Test transient unit has correct properties."""
|
|
systemd_service.StartTransientUnit.calls.clear()
|
|
systemd_unit_service.active_state = "inactive"
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
call = systemd_service.StartTransientUnit.calls[0]
|
|
properties = {prop[0]: prop[1] for prop in call[2]}
|
|
|
|
assert properties["Description"].value == "Supervisor gateway firewall rules"
|
|
assert properties["Type"].value == "oneshot"
|
|
assert properties["ExecStart"].signature == "a(sasb)"
|
|
assert len(properties["ExecStart"].value) == 4
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_unit_failed(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
systemd_unit_service: SystemdUnitService,
|
|
):
|
|
"""Test unsupported reason added when firewall unit fails."""
|
|
systemd_unit_service.active_state = "failed"
|
|
|
|
await coresys.host.firewall.apply_gateway_firewall_rules()
|
|
|
|
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|
|
|
|
|
|
async def test_apply_gateway_firewall_rules_unit_failed_via_signal(
|
|
coresys: CoreSys,
|
|
systemd_service: SystemdService,
|
|
systemd_unit_service: SystemdUnitService,
|
|
):
|
|
"""Test failure detected via property change signal when unit is still activating."""
|
|
systemd_unit_service.active_state = "activating"
|
|
|
|
task = asyncio.create_task(coresys.host.firewall.apply_gateway_firewall_rules())
|
|
await asyncio.sleep(0.1)
|
|
systemd_unit_service.emit_properties_changed({"ActiveState": "failed"})
|
|
await task
|
|
|
|
assert UnhealthyReason.DOCKER_GATEWAY_UNPROTECTED in coresys.resolution.unhealthy
|