1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-07-03 11:55:38 +01:00
Files
supervisor/tests/api/middleware/test_security.py
T
Stefan Agner ed91b18c4b tests: enable flake8-pytest-style (PT) ruff rules (#6857)
* tests: enable flake8-pytest-style (PT) ruff rules

Enable the `PT` ruff rule set and fix the resulting violations across the
test suite:

- PT006: pass parametrize argument names as tuples instead of a single
  comma-separated string.
- PT022: switch fixtures that have no teardown from `yield` to `return`
  so the lack of cleanup is obvious at a glance.
- PT011: add `match=` to broad `pytest.raises(ValueError)` blocks so the
  expected error is anchored to a specific message.
- PT012: hoist setup (patches, branching) out of `pytest.raises()`
  blocks so only the call that is expected to raise remains inside.
- PT013: replace `from pytest import X` with `import pytest` and access
  attributes via the module.
- PT015: replace `try/except` + `assert False` patterns with
  `pytest.raises(...)`.
- PT017: replace `assert` on exceptions inside `except` blocks with
  `pytest.raises(...) as exc_info` and assert on `exc_info.value`.

No behavioral changes to the tests; the full suite still passes.

* tests: address review feedback on PT ruff rule enablement

- Fix fixture return-type annotations after switching `yield` to `return`
  in tests/conftest.py: drop the `Generator[...]`/`AsyncGenerator[...]`
  wrapper for `dns_manager_service`, `supervisor_internet`, `websession`,
  and `mock_update_data` so the annotation matches what the fixture
  actually returns.
- Correct the return-type annotation of `fixture_ip6config_service` from
  `IP4ConfigService` to `IP6ConfigService`.
- Fix recurring "excepiton" typo in tests/utils/test_exception_helper.py.

* tests: verify backup cleanup on permission error

After `test_new_backup_permission_error` raises `BackupPermissionError`,
assert that no tarfile was left behind and `tmp_path` is empty. The
previous version only checked that the exception was raised, which
missed any regression where a partial tarfile would survive the failed
create.

* tests: rename DNS_GOOD_V6 to DNS_V6_UNSUPPORTED

The constant was named "good" but its tests assert that the URLs are
rejected by the DNS validator. The IPv6 URLs are well-formed but
currently rejected because IPv6 doesn't work with the Docker network
(see `dns_url` in supervisor/validate.py). Rename the constant and the
related test to make the intent obvious.
2026-05-20 22:17:54 +02:00

226 lines
7.9 KiB
Python

"""Test API security layer."""
import asyncio
from http import HTTPStatus
from unittest.mock import patch
from aiohttp import web
from aiohttp.test_utils import TestClient
import pytest
import urllib3
from supervisor.api import RestAPI
from supervisor.apps.app import App
from supervisor.const import ROLE_ALL, CoreState
from supervisor.coresys import CoreSys
# pylint: disable=redefined-outer-name
async def mock_handler(request):
"""Return OK."""
return web.Response(text="OK")
@pytest.fixture
async def api_system(aiohttp_client, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client."""
api = RestAPI(coresys)
api.webapp = web.Application()
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.load()
api.webapp.middlewares.append(api.security.block_bad_requests)
api.webapp.middlewares.append(api.security.system_validation)
api.webapp.router.add_get("/{all:.*}", mock_handler)
return await aiohttp_client(api.webapp)
@pytest.fixture
async def api_token_validation(aiohttp_client, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client with token validation middleware."""
api = RestAPI(coresys)
api.webapp = web.Application()
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.start()
api.webapp.middlewares.append(api.security.token_validation)
api.webapp.router.add_get("/{all:.*}", mock_handler)
api.webapp.router.add_post("/{all:.*}", mock_handler)
api.webapp.router.add_delete("/{all:.*}", mock_handler)
return await aiohttp_client(api.webapp)
@pytest.fixture(name="plugin_tokens")
async def fixture_plugin_tokens(coresys: CoreSys) -> None:
"""Mock plugin tokens used in middleware."""
# pylint: disable=protected-access
coresys.plugins.cli._data["access_token"] = "c_123456"
coresys.plugins.observer._data["access_token"] = "o_123456"
# pylint: enable=protected-access
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.INITIALIZE)
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.SETUP)
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.RUNNING)
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.STARTUP)
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@pytest.mark.parametrize(
("request_path", "request_params", "fail_on_query_string"),
[
("/proc/self/environ", {}, False),
("/", {"test": "/test/../../api"}, True),
("/", {"test": "test/../../api"}, True),
("/", {"test": "/test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/", {"test": "test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/", {"test": "test/%252E%252E/api"}, True),
("/", {"test": "test/%252E%252E%2fapi"}, True),
(
"/",
{"test": "test/%2525252E%2525252E%2525252f%2525252E%2525252E%2525252fapi"},
True,
),
("/test/.%252E/api", {}, False),
("/test/%252E%252E/api", {}, False),
("/test/%2E%2E%2f%2E%2E%2fapi", {}, False),
("/test/%2525252E%2525252E%2525252f%2525252E%2525252E/api", {}, False),
("/", {"sql": ";UNION SELECT (a, b"}, True),
("/", {"sql": "UNION%20SELECT%20%28a%2C%20b"}, True),
("/UNION%20SELECT%20%28a%2C%20b", {}, False),
("/", {"sql": "concat(..."}, True),
("/", {"xss": "<script >"}, True),
("/<script >", {"xss": ""}, False),
("/%3Cscript%3E", {}, False),
],
)
async def test_bad_requests(
request_path: str,
request_params: dict[str, str],
fail_on_query_string: bool,
api_system: TestClient,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test request paths that should be filtered."""
# Manual params handling
if request_params:
raw_params = "&".join(f"{val}={key}" for val, key in request_params.items())
man_params = f"?{raw_params}"
else:
man_params = ""
http = urllib3.PoolManager()
resp = await asyncio.get_running_loop().run_in_executor(
None,
http.request,
"GET",
f"http://{api_system.host}:{api_system.port}{request_path}{man_params}",
request_params,
)
assert resp.status == HTTPStatus.BAD_REQUEST
message = "Filtered a potential harmful request to:"
if fail_on_query_string:
message = "Filtered a request with a potential harmful query string:"
assert message in caplog.text
@pytest.mark.parametrize(
("request_method", "request_path", "success_roles"),
[
("post", "/auth/reset", {"admin"}),
("get", "/auth/list", {"admin"}),
("delete", "/auth/cache", {"admin", "manager"}),
("get", "/auth", set(ROLE_ALL)),
("post", "/auth", set(ROLE_ALL)),
("get", "/backups/info", set(ROLE_ALL)),
("get", "/backups/abc123/download", {"admin", "manager", "backup"}),
("post", "/backups/new/full", {"admin", "manager", "backup"}),
("post", "/backups/abc123/restore/full", {"admin", "manager", "backup"}),
("get", "/core/info", set(ROLE_ALL)),
("post", "/core/update", {"admin", "manager", "homeassistant"}),
("post", "/core/restart", {"admin", "manager", "homeassistant"}),
("get", "/addons/self/options/config", set(ROLE_ALL)),
("post", "/addons/self/options", set(ROLE_ALL)),
("post", "/addons/self/restart", set(ROLE_ALL)),
("post", "/addons/self/security", {"admin"}),
("get", "/addons/abc123/options/config", {"admin", "manager"}),
("post", "/addons/abc123/options", {"admin", "manager"}),
("post", "/addons/abc123/restart", {"admin", "manager"}),
("post", "/addons/abc123/security", {"admin"}),
("post", "/os/datadisk/wipe", {"admin"}),
("post", "/addons/self/sys_options", set()),
("post", "/addons/abc123/sys_options", set()),
],
)
@pytest.mark.usefixtures("plugin_tokens")
async def test_token_validation(
api_token_validation: TestClient,
install_app_example: App,
request_method: str,
request_path: str,
success_roles: set[str],
):
"""Test token validation paths."""
install_app_example.persist["access_token"] = "abc123"
install_app_example.data["hassio_api"] = True
for role in success_roles:
install_app_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200
for role in set(ROLE_ALL) - success_roles:
install_app_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 403
@pytest.mark.usefixtures("plugin_tokens")
async def test_home_assistant_paths(api_token_validation: TestClient, coresys: CoreSys):
"""Test Home Assistant only paths."""
coresys.homeassistant.supervisor_token = "abc123"
resp = await api_token_validation.post(
"/addons/local_test/sys_options", headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200