1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-05-19 06:08:51 +01:00
Files
supervisor/tests/api/test_auth.py
Stefan Agner 7fb621234e Add Unix socket support for Core communication with feature flag (#6742)
* Use Unix socket for Supervisor to Core communication

Reintroduce Unix socket support for Supervisor-to-Core communication
(reverted in #6735) with the addition of a feature flag gate. The
feature is now controlled by the `core_unix_socket` feature flag and
disabled by default.

When enabled and Core version supports it, Supervisor communicates with
Core via a Unix socket at /run/os/core.sock instead of TCP. This
eliminates the need for access token authentication on the socket path,
as Core authenticates the peer by the socket connection itself.

Key changes:
- Add FeatureFlag.CORE_UNIX_SOCKET to gate the feature
- HomeAssistantAPI: transport-aware session/url/websocket management
- WSClient: separate connect() (Unix, no auth) and connect_with_auth()
  (TCP) class methods with proper error handling
- APIProxy delegates websocket setup to api.connect_websocket()
- Container state tracking for Unix session lifecycle
- CI builder mounts /run/supervisor for integration tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Sort feature flags alphabetically

* Drop per-call max_msg_size from WSClient

Hardcode the WebSocket message size cap to 64 MB in WSClient and remove
the parameter from WSClient.connect, connect_with_auth, _ws_connect,
and HomeAssistantAPI.connect_websocket. This was only ever overridden
by APIProxy, so threading it through four layers was unnecessary.

max_msg_size is a cap, not a pre-allocation; aiohttp only grows buffers
to the size of actual incoming messages. Supervisor's own control
channel never approaches 64 MB, so unifying the limit has no runtime
cost.

Addresses review feedback on #6742.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 15:03:05 +02:00

380 lines
12 KiB
Python

"""Test auth API."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.hdrs import WWW_AUTHENTICATE
from aiohttp.test_utils import TestClient
import pytest
from supervisor.addons.addon import App
from supervisor.coresys import CoreSys
from supervisor.exceptions import HomeAssistantAPIError, HomeAssistantWSError
from supervisor.homeassistant.api import HomeAssistantAPI
from tests.common import MockResponse
from tests.const import TEST_ADDON_SLUG
LIST_USERS_RESPONSE = [
{
"id": "a1d90e114a3b4da4a487fe327918dcef",
"username": None,
"name": "Home Assistant Content",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-read-only"],
"credentials": [],
},
{
"id": "d25a2ca897704a31ac9534b5324dc230",
"username": None,
"name": "Supervisor",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "0b39e9305ba64531a8fee9ed5b86876e",
"username": None,
"name": "Home Assistant Cast",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "514698a459cd4ce0b75f137a3d7df539",
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "homeassistant"}],
},
{
"id": "7d5fac79097a4eb49aff83cdf20821b0",
"username": None,
"name": None,
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "command_line"}],
},
]
@pytest.fixture(name="mock_check_login")
def fixture_mock_check_login(coresys: CoreSys):
"""Patch sys_auth.check_login."""
with patch.object(coresys.auth, "check_login", new_callable=AsyncMock) as mock:
yield mock
async def test_password_reset(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
websession: MagicMock,
):
"""Test password reset api."""
coresys.homeassistant.api._access_token = "abc123" # pylint: disable=protected-access
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
websession.request = MagicMock(return_value=MockResponse(status=200))
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 200
assert "Successful password reset for 'john'" in caplog.text
@pytest.mark.parametrize(
("request_mock", "expected_log"),
[
(
MagicMock(return_value=MockResponse(status=400)),
"The user 'john' is not registered",
),
(
MagicMock(side_effect=HomeAssistantAPIError("fail")),
"Can't request password reset on Home Assistant: fail",
),
],
)
async def test_failed_password_reset(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
websession: MagicMock,
request_mock: MagicMock,
expected_log: str,
):
"""Test failed password reset."""
coresys.homeassistant.api._access_token = "abc123" # pylint: disable=protected-access
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
websession.request = request_mock
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 400
body = await resp.json()
assert (
body["message"]
== "Username 'john' does not exist. Check list of users using 'ha auth list'."
)
assert body["error_key"] == "auth_password_reset_error"
assert body["extra_fields"] == {
"user": "john",
"auth_list_command": "ha auth list",
}
assert expected_log in caplog.text
async def test_list_users(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test list users api."""
ha_ws_client.async_send_command.return_value = LIST_USERS_RESPONSE
resp = await api_client.get("/auth/list")
assert resp.status == 200
result = await resp.json()
assert result["data"]["users"] == [
{
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"group_ids": ["system-admin"],
},
]
async def test_list_users_ws_error(
api_client: TestClient,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
):
"""Test WS error when listing users via API."""
ha_ws_client.async_send_command = AsyncMock(
side_effect=HomeAssistantWSError("fail")
)
resp = await api_client.get("/auth/list")
assert resp.status == 500
result = await resp.json()
assert result == {
"result": "error",
"message": "Can't request listing users on Home Assistant. Check Supervisor logs for details",
"error_key": "auth_list_users_error",
}
assert "Can't request listing users on Home Assistant: fail" in caplog.text
@pytest.mark.parametrize(
("field", "api_client"),
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],
indirect=["api_client"],
)
async def test_auth_json_success(
api_client: TestClient,
mock_check_login: AsyncMock,
install_app_ssh: App,
field: str,
):
"""Test successful JSON auth."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={field: "test", "password": "pass"})
assert resp.status == 200
@pytest.mark.parametrize(
("user", "password", "api_client"),
[
(None, "password", TEST_ADDON_SLUG),
("user", None, TEST_ADDON_SLUG),
],
indirect=["api_client"],
)
async def test_auth_json_failure_none(
api_client: TestClient,
mock_check_login: AsyncMock,
install_app_ssh: App,
user: str | None,
password: str | None,
):
"""Test failed JSON auth with none user or password."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={"username": user, "password": password})
assert resp.status == 401
assert (
resp.headers["WWW-Authenticate"]
== 'Basic realm="Home Assistant Authentication"'
)
body = await resp.json()
assert body["message"] == "Username and password must be strings"
assert body["error_key"] == "auth_invalid_non_string_value_error"
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_invalid_credentials(
api_client: TestClient, mock_check_login: AsyncMock, install_app_ssh: App
):
"""Test failed JSON auth due to invalid credentials."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth", json={"username": "test", "password": "wrong"}
)
assert WWW_AUTHENTICATE not in resp.headers
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_empty_body(api_client: TestClient, install_app_ssh: App):
"""Test JSON auth with empty body."""
resp = await api_client.post(
"/auth", data="", headers={"Content-Type": "application/json"}
)
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_invalid_json(api_client: TestClient, install_app_ssh: App):
"""Test JSON auth with malformed JSON."""
resp = await api_client.post(
"/auth", data="{not json}", headers={"Content-Type": "application/json"}
)
assert resp.status == 400
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_urlencoded_success(
api_client: TestClient, mock_check_login: AsyncMock, install_app_ssh: App
):
"""Test successful URL-encoded auth."""
mock_check_login.return_value = True
resp = await api_client.post(
"/auth",
data="username=test&password=pass",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert resp.status == 200
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_urlencoded_failure(
api_client: TestClient, mock_check_login: AsyncMock, install_app_ssh: App
):
"""Test URL-encoded auth with invalid credentials."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth",
data="username=test&password=fail",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert WWW_AUTHENTICATE not in resp.headers
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_unsupported_content_type(
api_client: TestClient, install_app_ssh: App
):
"""Test auth with unsupported content type."""
resp = await api_client.post(
"/auth", data="something", headers={"Content-Type": "text/plain"}
)
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_basic_auth(
api_client: TestClient, mock_check_login: AsyncMock, install_app_ssh: App
):
"""Test auth with BasicAuth header."""
mock_check_login.return_value = True
resp = await api_client.post(
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
)
assert resp.status == 200
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_basic_auth_failure(
api_client: TestClient, mock_check_login: AsyncMock, install_app_ssh: App
):
"""Test auth with BasicAuth header and failure."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
)
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_bearer_token_returns_401(
api_client: TestClient, install_app_ssh: App
):
"""Test that a Bearer token in Authorization header returns 401, not 500."""
resp = await api_client.post(
"/auth", headers={"Authorization": "Bearer sometoken123"}
)
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
assert resp.status == 401
@pytest.mark.parametrize("api_client", ["local_example"], indirect=True)
async def test_auth_app_no_auth_access(
api_client: TestClient, install_app_example: App
):
"""Test auth where app is not allowed to access auth API."""
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
assert resp.status == 403
async def test_non_app_token_no_auth_access(api_client: TestClient):
"""Test auth where app is not allowed to access auth API."""
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
assert resp.status == 403
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
@pytest.mark.usefixtures("install_app_ssh")
async def test_auth_backend_login_failure(api_client: TestClient):
"""Test backend login failure on auth."""
with (
patch.object(HomeAssistantAPI, "check_api_state", return_value=True),
patch.object(
HomeAssistantAPI, "make_request", side_effect=HomeAssistantAPIError("fail")
),
):
resp = await api_client.post(
"/auth", json={"username": "test", "password": "pass"}
)
assert resp.status == 500
body = await resp.json()
assert (
body["message"]
== "Unable to validate authentication details with Home Assistant. Check Supervisor logs for details"
)
assert body["error_key"] == "auth_home_assistant_api_validation_error"
assert "extra_fields" not in body