1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-04-17 23:33:35 +01:00
Files
supervisor/tests/api/test_auth.py
Stefan Agner 28fa0b35bd Use Unix socket for Supervisor to Core communication (#6590)
* Use Unix socket for Supervisor to Core communication

Switch internal Supervisor-to-Core HTTP and WebSocket communication
from TCP (port 8123) to a Unix domain socket.

The existing /run/supervisor directory on the host (already mounted
at /run/os inside the Supervisor container) is bind-mounted into the
Core container at /run/supervisor. Core receives the socket path via
the SUPERVISOR_CORE_API_SOCKET environment variable, creates the
socket there, and Supervisor connects to it via aiohttp.UnixConnector
at /run/os/core.sock.

Since the Unix socket is only reachable by processes on the same host,
requests arriving over it are implicitly trusted and authenticated as
the existing Supervisor system user. This removes the token round-trip
where Supervisor had to obtain and send Bearer tokens on every Core
API call. WebSocket connections are likewise authenticated implicitly,
skipping the auth_required/auth handshake.

Key design decisions:
- Version-gated by CORE_UNIX_SOCKET_MIN_VERSION so older Core
  versions transparently continue using TCP with token auth
- LANDINGPAGE is explicitly excluded (not a CalVer version)
- Hard-fails with a clear error if the socket file is unexpectedly
  missing when Unix socket communication is expected
- WSClient.connect() for Unix socket (no auth) and
  WSClient.connect_with_auth() for TCP (token auth) separate the
  two connection modes cleanly
- Token refresh always uses the TCP websession since it is inherently
  a TCP/Bearer-auth operation
- Logs which transport (Unix socket vs TCP) is being used on first
  request

Closes #6626
Related Core PR: home-assistant/core#163907

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Close WebSocket on handshake failure and validate auth_required

Ensure the underlying WebSocket connection is closed before raising
when the handshake produces an unexpected message. Also validate that
the first TCP message is auth_required before sending credentials.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Fix pylint protected-access warnings in tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Check running container env before using Unix socket

Split use_unix_socket into two properties to handle the Supervisor
upgrade transition where Core is still running with a container
started by the old Supervisor (without SUPERVISOR_CORE_API_SOCKET):

- supports_unix_socket: version check only, used when creating the
  Core container to decide whether to set the env var
- use_unix_socket: version check + running container env check, used
  for communication decisions

This ensures TCP fallback during the upgrade transition while still
hard-failing if the socket is missing after Supervisor configured
Core to use it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Improve Core API communication logging and error handling

- Remove transport log from make_request that logged before Core
  container was attached, causing misleading connection logs
- Log "Connected to Core via ..." once on first successful API response
  in get_api_state, when the transport is actually known
- Remove explicit socket existence check from session property, let
  aiohttp UnixConnector produce natural connection errors during
  Core startup (same as TCP connection refused)
- Add validation in get_core_state matching get_config pattern
- Restore make_request docstring

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Guard Core API requests with container running check

Add is_running() check to make_request and connect_websocket so no
HTTP or WebSocket connection is attempted when the Core container is
not running. This avoids misleading connection attempts during
Supervisor startup before Core is ready.

Also make use_unix_socket raise if container metadata is not available
instead of silently falling back to TCP. This is a defensive check
since is_running() guards should prevent reaching this state.

Add attached property to DockerInterface to expose whether container
metadata has been loaded.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Reset Core API connection state on container stop

Listen for Core container STOPPED/FAILED events to reset the
connection state: clear the _core_connected flag so the transport
is logged again on next successful connection, and close any stale
Unix socket session.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Only mount /run/supervisor if we use it

* Fix pytest errors

* Remove redundant is_running check from ingress panel update

The is_running() guard in update_hass_panel is now redundant since
make_request checks is_running() internally. Also mock is_running
in the websession test fixture since tests using it need make_request
to proceed past the container running check.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Bind mount /run/supervisor to Supervisor /run/os

Home Assistant OS (as well as the Supervised run scripts) bind mount
/run/supervisor to /run/os in Supervisor. Since we reuse this location
for the communication socket between Supervisor and Core, we need to
also bind mount /run/supervisor to Supervisor /run/os in CI.

* Wrap WebSocket handshake errors in HomeAssistantAPIError

Unexpected exceptions during the WebSocket handshake (KeyError,
ValueError, TypeError from malformed messages) are now wrapped in
HomeAssistantAPIError inside WSClient.connect/connect_with_auth.
This means callers only need to catch HomeAssistantAPIError.

Remove the now-unnecessary except (RuntimeError, ValueError,
TypeError) from proxy _websocket_client and add a proper error
message to the APIError per review feedback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Narrow WebSocket handshake exception handling

Replace broad `except Exception` with specific exception types that
can actually occur during the WebSocket handshake: KeyError (missing
dict keys), ValueError (bad JSON), TypeError (non-text WS message),
aiohttp.ClientError (connection errors), and TimeoutError. This
avoids silently wrapping programming errors into HomeAssistantAPIError.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Remove unused create_mountpoint from MountBindOptions

The field was added but never used. The /run/supervisor host path
is guaranteed to exist since HAOS creates it for the Supervisor
container mount, so auto-creating the mountpoint is unnecessary.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Clear stale access token before raising on final retry

Move token clear before the attempt check in connect_websocket so
the stale token is always discarded, even when raising on the final
attempt. Without this, the next call would reuse the cached bad token
via _ensure_access_token's fast path, wasting a round-trip.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add tests for Unix socket communication and Core API

Add tests for the new Unix socket communication path and improve
existing test coverage:

- Version-based supports_unix_socket and env-based use_unix_socket
- api_url/ws_url transport selection
- Connection lifecycle: connected log after restart, ignoring
  unrelated container events
- get_api_state/check_api_state parameterized across versions,
  responses, and error cases
- make_request is_running guard and TCP flow with real token fetch
- connect_websocket for both Unix and TCP (with token verification)
- WSClient.connect/connect_with_auth handshake success, errors,
  cleanup on failure, and close with pending futures

Consolidate existing tests into parameterized form and drop synthetic
tests that covered very little.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 15:09:38 +02:00

380 lines
12 KiB
Python

"""Test auth API."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.hdrs import WWW_AUTHENTICATE
from aiohttp.test_utils import TestClient
import pytest
from supervisor.addons.addon import Addon
from supervisor.coresys import CoreSys
from supervisor.exceptions import HomeAssistantAPIError, HomeAssistantWSError
from supervisor.homeassistant.api import HomeAssistantAPI
from tests.common import MockResponse
from tests.const import TEST_ADDON_SLUG
LIST_USERS_RESPONSE = [
{
"id": "a1d90e114a3b4da4a487fe327918dcef",
"username": None,
"name": "Home Assistant Content",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-read-only"],
"credentials": [],
},
{
"id": "d25a2ca897704a31ac9534b5324dc230",
"username": None,
"name": "Supervisor",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "0b39e9305ba64531a8fee9ed5b86876e",
"username": None,
"name": "Home Assistant Cast",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "514698a459cd4ce0b75f137a3d7df539",
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "homeassistant"}],
},
{
"id": "7d5fac79097a4eb49aff83cdf20821b0",
"username": None,
"name": None,
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "command_line"}],
},
]
@pytest.fixture(name="mock_check_login")
def fixture_mock_check_login(coresys: CoreSys):
"""Patch sys_auth.check_login."""
with patch.object(coresys.auth, "check_login", new_callable=AsyncMock) as mock:
yield mock
async def test_password_reset(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
websession: MagicMock,
):
"""Test password reset api."""
coresys.homeassistant.api._access_token = "abc123" # pylint: disable=protected-access
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
websession.request = MagicMock(return_value=MockResponse(status=200))
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 200
assert "Successful password reset for 'john'" in caplog.text
@pytest.mark.parametrize(
("request_mock", "expected_log"),
[
(
MagicMock(return_value=MockResponse(status=400)),
"The user 'john' is not registered",
),
(
MagicMock(side_effect=HomeAssistantAPIError("fail")),
"Can't request password reset on Home Assistant: fail",
),
],
)
async def test_failed_password_reset(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
websession: MagicMock,
request_mock: MagicMock,
expected_log: str,
):
"""Test failed password reset."""
coresys.homeassistant.api._access_token = "abc123" # pylint: disable=protected-access
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
websession.request = request_mock
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 400
body = await resp.json()
assert (
body["message"]
== "Username 'john' does not exist. Check list of users using 'ha auth list'."
)
assert body["error_key"] == "auth_password_reset_error"
assert body["extra_fields"] == {
"user": "john",
"auth_list_command": "ha auth list",
}
assert expected_log in caplog.text
async def test_list_users(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test list users api."""
ha_ws_client.async_send_command.return_value = LIST_USERS_RESPONSE
resp = await api_client.get("/auth/list")
assert resp.status == 200
result = await resp.json()
assert result["data"]["users"] == [
{
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"group_ids": ["system-admin"],
},
]
async def test_list_users_ws_error(
api_client: TestClient,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
):
"""Test WS error when listing users via API."""
ha_ws_client.async_send_command = AsyncMock(
side_effect=HomeAssistantWSError("fail")
)
resp = await api_client.get("/auth/list")
assert resp.status == 500
result = await resp.json()
assert result == {
"result": "error",
"message": "Can't request listing users on Home Assistant. Check Supervisor logs for details",
"error_key": "auth_list_users_error",
}
assert "Can't request listing users on Home Assistant: fail" in caplog.text
@pytest.mark.parametrize(
("field", "api_client"),
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],
indirect=["api_client"],
)
async def test_auth_json_success(
api_client: TestClient,
mock_check_login: AsyncMock,
install_addon_ssh: Addon,
field: str,
):
"""Test successful JSON auth."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={field: "test", "password": "pass"})
assert resp.status == 200
@pytest.mark.parametrize(
("user", "password", "api_client"),
[
(None, "password", TEST_ADDON_SLUG),
("user", None, TEST_ADDON_SLUG),
],
indirect=["api_client"],
)
async def test_auth_json_failure_none(
api_client: TestClient,
mock_check_login: AsyncMock,
install_addon_ssh: Addon,
user: str | None,
password: str | None,
):
"""Test failed JSON auth with none user or password."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={"username": user, "password": password})
assert resp.status == 401
assert (
resp.headers["WWW-Authenticate"]
== 'Basic realm="Home Assistant Authentication"'
)
body = await resp.json()
assert body["message"] == "Username and password must be strings"
assert body["error_key"] == "auth_invalid_non_string_value_error"
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_invalid_credentials(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
):
"""Test failed JSON auth due to invalid credentials."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth", json={"username": "test", "password": "wrong"}
)
assert WWW_AUTHENTICATE not in resp.headers
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_empty_body(api_client: TestClient, install_addon_ssh: Addon):
"""Test JSON auth with empty body."""
resp = await api_client.post(
"/auth", data="", headers={"Content-Type": "application/json"}
)
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_invalid_json(api_client: TestClient, install_addon_ssh: Addon):
"""Test JSON auth with malformed JSON."""
resp = await api_client.post(
"/auth", data="{not json}", headers={"Content-Type": "application/json"}
)
assert resp.status == 400
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_urlencoded_success(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
):
"""Test successful URL-encoded auth."""
mock_check_login.return_value = True
resp = await api_client.post(
"/auth",
data="username=test&password=pass",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert resp.status == 200
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_urlencoded_failure(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
):
"""Test URL-encoded auth with invalid credentials."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth",
data="username=test&password=fail",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert WWW_AUTHENTICATE not in resp.headers
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_unsupported_content_type(
api_client: TestClient, install_addon_ssh: Addon
):
"""Test auth with unsupported content type."""
resp = await api_client.post(
"/auth", data="something", headers={"Content-Type": "text/plain"}
)
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_basic_auth(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
):
"""Test auth with BasicAuth header."""
mock_check_login.return_value = True
resp = await api_client.post(
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
)
assert resp.status == 200
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_basic_auth_failure(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
):
"""Test auth with BasicAuth header and failure."""
mock_check_login.return_value = False
resp = await api_client.post(
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
)
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_bearer_token_returns_401(
api_client: TestClient, install_addon_ssh: Addon
):
"""Test that a Bearer token in Authorization header returns 401, not 500."""
resp = await api_client.post(
"/auth", headers={"Authorization": "Bearer sometoken123"}
)
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
assert resp.status == 401
@pytest.mark.parametrize("api_client", ["local_example"], indirect=True)
async def test_auth_addon_no_auth_access(
api_client: TestClient, install_addon_example: Addon
):
"""Test auth where add-on is not allowed to access auth API."""
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
assert resp.status == 403
async def test_non_addon_token_no_auth_access(api_client: TestClient):
"""Test auth where add-on is not allowed to access auth API."""
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
assert resp.status == 403
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
@pytest.mark.usefixtures("install_addon_ssh")
async def test_auth_backend_login_failure(api_client: TestClient):
"""Test backend login failure on auth."""
with (
patch.object(HomeAssistantAPI, "check_api_state", return_value=True),
patch.object(
HomeAssistantAPI, "make_request", side_effect=HomeAssistantAPIError("fail")
),
):
resp = await api_client.post(
"/auth", json={"username": "test", "password": "pass"}
)
assert resp.status == 500
body = await resp.json()
assert (
body["message"]
== "Unable to validate authentication details with Home Assistant. Check Supervisor logs for details"
)
assert body["error_key"] == "auth_home_assistant_api_validation_error"
assert "extra_fields" not in body