mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 08:12:47 +01:00
* Unify Core user listing with HomeAssistantUser model Replace the ingress-specific IngressSessionDataUser with a general HomeAssistantUser dataclass that models the Core config/auth/list WS response. This deduplicates the WS call (previously in both auth.py and module.py) into a single HomeAssistant.list_users() method. - Add HomeAssistantUser dataclass with fields matching Core's user API - Remove get_users() and its unnecessary 5-minute Job throttle - Auth and ingress consumers both use HomeAssistant.list_users() - Auth API endpoint uses typed attribute access instead of dict keys - Migrate session serialization from legacy "displayname" to "name" - Accept both keys in schema/deserialization for backwards compat - Add test for loading persisted sessions with legacy displayname key Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Tighten list_users() to trust Core's auth/list contract Core's config/auth/list WS command always returns a list, never None. Replace the silent `if not raw: return []` (which also swallowed empty lists) with an assert, remove the dead AuthListUsersNoneResponseError exception class, and document the HomeAssistantWSError contract in the docstring. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Remove | None from async_send_command return type The WebSocket result is always set from data["result"] in _receive_json, never explicitly to None. Remove the misleading | None from the return type of both WSClient and HomeAssistantWebSocket async_send_command, and drop the now-unnecessary assert in list_users. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Use HomeAssistantWSConnectionError in _ensure_connected _ensure_connected and connect_with_auth raise on connection-level failures, so use the more specific HomeAssistantWSConnectionError instead of the broad HomeAssistantWSError. This allows callers to distinguish connection errors from Core API errors (e.g. unsuccessful WebSocket command responses). Also document that _ensure_connected can propagate HomeAssistantAuthError from ensure_access_token. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Remove user list cache from _find_user_by_id Drop the _list_of_users cache to avoid stale auth data in ingress session creation. The method now fetches users fresh each time and returns None on any API error instead of serving potentially outdated cached results. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
369 lines
12 KiB
Python
369 lines
12 KiB
Python
"""Test auth API."""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from aiohttp.hdrs import WWW_AUTHENTICATE
|
|
from aiohttp.test_utils import TestClient
|
|
import pytest
|
|
|
|
from supervisor.addons.addon import Addon
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.exceptions import HomeAssistantAPIError, HomeAssistantWSError
|
|
from supervisor.homeassistant.api import HomeAssistantAPI
|
|
|
|
from tests.common import MockResponse
|
|
from tests.const import TEST_ADDON_SLUG
|
|
|
|
LIST_USERS_RESPONSE = [
|
|
{
|
|
"id": "a1d90e114a3b4da4a487fe327918dcef",
|
|
"username": None,
|
|
"name": "Home Assistant Content",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-read-only"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "d25a2ca897704a31ac9534b5324dc230",
|
|
"username": None,
|
|
"name": "Supervisor",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "0b39e9305ba64531a8fee9ed5b86876e",
|
|
"username": None,
|
|
"name": "Home Assistant Cast",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "514698a459cd4ce0b75f137a3d7df539",
|
|
"username": "test",
|
|
"name": "Test",
|
|
"is_owner": True,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": False,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [{"type": "homeassistant"}],
|
|
},
|
|
{
|
|
"id": "7d5fac79097a4eb49aff83cdf20821b0",
|
|
"username": None,
|
|
"name": None,
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": False,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [{"type": "command_line"}],
|
|
},
|
|
]
|
|
|
|
|
|
@pytest.fixture(name="mock_check_login")
|
|
def fixture_mock_check_login(coresys: CoreSys):
|
|
"""Patch sys_auth.check_login."""
|
|
with patch.object(coresys.auth, "check_login", new_callable=AsyncMock) as mock:
|
|
yield mock
|
|
|
|
|
|
async def test_password_reset(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
caplog: pytest.LogCaptureFixture,
|
|
websession: MagicMock,
|
|
):
|
|
"""Test password reset api."""
|
|
coresys.homeassistant.api.access_token = "abc123"
|
|
# pylint: disable-next=protected-access
|
|
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
|
|
days=1
|
|
)
|
|
|
|
websession.request = MagicMock(return_value=MockResponse(status=200))
|
|
resp = await api_client.post(
|
|
"/auth/reset", json={"username": "john", "password": "doe"}
|
|
)
|
|
assert resp.status == 200
|
|
assert "Successful password reset for 'john'" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("request_mock", "expected_log"),
|
|
[
|
|
(
|
|
MagicMock(return_value=MockResponse(status=400)),
|
|
"The user 'john' is not registered",
|
|
),
|
|
(
|
|
MagicMock(side_effect=HomeAssistantAPIError("fail")),
|
|
"Can't request password reset on Home Assistant: fail",
|
|
),
|
|
],
|
|
)
|
|
async def test_failed_password_reset(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
caplog: pytest.LogCaptureFixture,
|
|
websession: MagicMock,
|
|
request_mock: MagicMock,
|
|
expected_log: str,
|
|
):
|
|
"""Test failed password reset."""
|
|
coresys.homeassistant.api.access_token = "abc123"
|
|
# pylint: disable-next=protected-access
|
|
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
|
|
days=1
|
|
)
|
|
|
|
websession.request = request_mock
|
|
resp = await api_client.post(
|
|
"/auth/reset", json={"username": "john", "password": "doe"}
|
|
)
|
|
assert resp.status == 400
|
|
body = await resp.json()
|
|
assert (
|
|
body["message"]
|
|
== "Username 'john' does not exist. Check list of users using 'ha auth list'."
|
|
)
|
|
assert body["error_key"] == "auth_password_reset_error"
|
|
assert body["extra_fields"] == {
|
|
"user": "john",
|
|
"auth_list_command": "ha auth list",
|
|
}
|
|
assert expected_log in caplog.text
|
|
|
|
|
|
async def test_list_users(
|
|
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
|
|
):
|
|
"""Test list users api."""
|
|
ha_ws_client.async_send_command.return_value = LIST_USERS_RESPONSE
|
|
resp = await api_client.get("/auth/list")
|
|
assert resp.status == 200
|
|
result = await resp.json()
|
|
assert result["data"]["users"] == [
|
|
{
|
|
"username": "test",
|
|
"name": "Test",
|
|
"is_owner": True,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"group_ids": ["system-admin"],
|
|
},
|
|
]
|
|
|
|
|
|
async def test_list_users_ws_error(
|
|
api_client: TestClient,
|
|
ha_ws_client: AsyncMock,
|
|
caplog: pytest.LogCaptureFixture,
|
|
):
|
|
"""Test WS error when listing users via API."""
|
|
ha_ws_client.async_send_command = AsyncMock(
|
|
side_effect=HomeAssistantWSError("fail")
|
|
)
|
|
resp = await api_client.get("/auth/list")
|
|
assert resp.status == 500
|
|
result = await resp.json()
|
|
assert result == {
|
|
"result": "error",
|
|
"message": "Can't request listing users on Home Assistant. Check supervisor logs for details (check with 'ha supervisor logs')",
|
|
"error_key": "auth_list_users_error",
|
|
"extra_fields": {"logs_command": "ha supervisor logs"},
|
|
}
|
|
assert "Can't request listing users on Home Assistant: fail" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("field", "api_client"),
|
|
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],
|
|
indirect=["api_client"],
|
|
)
|
|
async def test_auth_json_success(
|
|
api_client: TestClient,
|
|
mock_check_login: AsyncMock,
|
|
install_addon_ssh: Addon,
|
|
field: str,
|
|
):
|
|
"""Test successful JSON auth."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post("/auth", json={field: "test", "password": "pass"})
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("user", "password", "api_client"),
|
|
[
|
|
(None, "password", TEST_ADDON_SLUG),
|
|
("user", None, TEST_ADDON_SLUG),
|
|
],
|
|
indirect=["api_client"],
|
|
)
|
|
async def test_auth_json_failure_none(
|
|
api_client: TestClient,
|
|
mock_check_login: AsyncMock,
|
|
install_addon_ssh: Addon,
|
|
user: str | None,
|
|
password: str | None,
|
|
):
|
|
"""Test failed JSON auth with none user or password."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post("/auth", json={"username": user, "password": password})
|
|
assert resp.status == 401
|
|
assert (
|
|
resp.headers["WWW-Authenticate"]
|
|
== 'Basic realm="Home Assistant Authentication"'
|
|
)
|
|
body = await resp.json()
|
|
assert body["message"] == "Username and password must be strings"
|
|
assert body["error_key"] == "auth_invalid_non_string_value_error"
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_invalid_credentials(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test failed JSON auth due to invalid credentials."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth", json={"username": "test", "password": "wrong"}
|
|
)
|
|
assert WWW_AUTHENTICATE not in resp.headers
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_empty_body(api_client: TestClient, install_addon_ssh: Addon):
|
|
"""Test JSON auth with empty body."""
|
|
resp = await api_client.post(
|
|
"/auth", data="", headers={"Content-Type": "application/json"}
|
|
)
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_invalid_json(api_client: TestClient, install_addon_ssh: Addon):
|
|
"""Test JSON auth with malformed JSON."""
|
|
resp = await api_client.post(
|
|
"/auth", data="{not json}", headers={"Content-Type": "application/json"}
|
|
)
|
|
assert resp.status == 400
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_urlencoded_success(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test successful URL-encoded auth."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post(
|
|
"/auth",
|
|
data="username=test&password=pass",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_urlencoded_failure(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test URL-encoded auth with invalid credentials."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth",
|
|
data="username=test&password=fail",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
assert WWW_AUTHENTICATE not in resp.headers
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_unsupported_content_type(
|
|
api_client: TestClient, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with unsupported content type."""
|
|
resp = await api_client.post(
|
|
"/auth", data="something", headers={"Content-Type": "text/plain"}
|
|
)
|
|
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_basic_auth(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with BasicAuth header."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post(
|
|
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_basic_auth_failure(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with BasicAuth header and failure."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
|
|
)
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", ["local_example"], indirect=True)
|
|
async def test_auth_addon_no_auth_access(
|
|
api_client: TestClient, install_addon_example: Addon
|
|
):
|
|
"""Test auth where add-on is not allowed to access auth API."""
|
|
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
|
|
assert resp.status == 403
|
|
|
|
|
|
async def test_non_addon_token_no_auth_access(api_client: TestClient):
|
|
"""Test auth where add-on is not allowed to access auth API."""
|
|
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
|
|
assert resp.status == 403
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
@pytest.mark.usefixtures("install_addon_ssh")
|
|
async def test_auth_backend_login_failure(api_client: TestClient):
|
|
"""Test backend login failure on auth."""
|
|
with (
|
|
patch.object(HomeAssistantAPI, "check_api_state", return_value=True),
|
|
patch.object(
|
|
HomeAssistantAPI, "make_request", side_effect=HomeAssistantAPIError("fail")
|
|
),
|
|
):
|
|
resp = await api_client.post(
|
|
"/auth", json={"username": "test", "password": "pass"}
|
|
)
|
|
assert resp.status == 500
|
|
body = await resp.json()
|
|
assert (
|
|
body["message"]
|
|
== "Unable to validate authentication details with Home Assistant. Check supervisor logs for details (check with 'ha supervisor logs')"
|
|
)
|
|
assert body["error_key"] == "auth_home_assistant_api_validation_error"
|
|
assert body["extra_fields"] == {"logs_command": "ha supervisor logs"}
|