mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 00:07:16 +01:00
* Remove CLI command hint from unknown error messages Since #6303 introduced specific error messages for many cases, the generic "check with 'ha supervisor logs'" hint in unknown error messages is no longer as useful. Remove the CLI command part while keeping the "Check supervisor logs for details" rider. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use consistently "Supervisor logs" with capitalization Co-authored-by: Jan Čermák <sairon@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Jan Čermák <sairon@users.noreply.github.com>
380 lines
12 KiB
Python
380 lines
12 KiB
Python
"""Test auth API."""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from aiohttp.hdrs import WWW_AUTHENTICATE
|
|
from aiohttp.test_utils import TestClient
|
|
import pytest
|
|
|
|
from supervisor.addons.addon import Addon
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.exceptions import HomeAssistantAPIError, HomeAssistantWSError
|
|
from supervisor.homeassistant.api import HomeAssistantAPI
|
|
|
|
from tests.common import MockResponse
|
|
from tests.const import TEST_ADDON_SLUG
|
|
|
|
LIST_USERS_RESPONSE = [
|
|
{
|
|
"id": "a1d90e114a3b4da4a487fe327918dcef",
|
|
"username": None,
|
|
"name": "Home Assistant Content",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-read-only"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "d25a2ca897704a31ac9534b5324dc230",
|
|
"username": None,
|
|
"name": "Supervisor",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "0b39e9305ba64531a8fee9ed5b86876e",
|
|
"username": None,
|
|
"name": "Home Assistant Cast",
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": True,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [],
|
|
},
|
|
{
|
|
"id": "514698a459cd4ce0b75f137a3d7df539",
|
|
"username": "test",
|
|
"name": "Test",
|
|
"is_owner": True,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": False,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [{"type": "homeassistant"}],
|
|
},
|
|
{
|
|
"id": "7d5fac79097a4eb49aff83cdf20821b0",
|
|
"username": None,
|
|
"name": None,
|
|
"is_owner": False,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"system_generated": False,
|
|
"group_ids": ["system-admin"],
|
|
"credentials": [{"type": "command_line"}],
|
|
},
|
|
]
|
|
|
|
|
|
@pytest.fixture(name="mock_check_login")
|
|
def fixture_mock_check_login(coresys: CoreSys):
|
|
"""Patch sys_auth.check_login."""
|
|
with patch.object(coresys.auth, "check_login", new_callable=AsyncMock) as mock:
|
|
yield mock
|
|
|
|
|
|
async def test_password_reset(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
caplog: pytest.LogCaptureFixture,
|
|
websession: MagicMock,
|
|
):
|
|
"""Test password reset api."""
|
|
coresys.homeassistant.api.access_token = "abc123"
|
|
# pylint: disable-next=protected-access
|
|
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
|
|
days=1
|
|
)
|
|
|
|
websession.request = MagicMock(return_value=MockResponse(status=200))
|
|
resp = await api_client.post(
|
|
"/auth/reset", json={"username": "john", "password": "doe"}
|
|
)
|
|
assert resp.status == 200
|
|
assert "Successful password reset for 'john'" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("request_mock", "expected_log"),
|
|
[
|
|
(
|
|
MagicMock(return_value=MockResponse(status=400)),
|
|
"The user 'john' is not registered",
|
|
),
|
|
(
|
|
MagicMock(side_effect=HomeAssistantAPIError("fail")),
|
|
"Can't request password reset on Home Assistant: fail",
|
|
),
|
|
],
|
|
)
|
|
async def test_failed_password_reset(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
caplog: pytest.LogCaptureFixture,
|
|
websession: MagicMock,
|
|
request_mock: MagicMock,
|
|
expected_log: str,
|
|
):
|
|
"""Test failed password reset."""
|
|
coresys.homeassistant.api.access_token = "abc123"
|
|
# pylint: disable-next=protected-access
|
|
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
|
|
days=1
|
|
)
|
|
|
|
websession.request = request_mock
|
|
resp = await api_client.post(
|
|
"/auth/reset", json={"username": "john", "password": "doe"}
|
|
)
|
|
assert resp.status == 400
|
|
body = await resp.json()
|
|
assert (
|
|
body["message"]
|
|
== "Username 'john' does not exist. Check list of users using 'ha auth list'."
|
|
)
|
|
assert body["error_key"] == "auth_password_reset_error"
|
|
assert body["extra_fields"] == {
|
|
"user": "john",
|
|
"auth_list_command": "ha auth list",
|
|
}
|
|
assert expected_log in caplog.text
|
|
|
|
|
|
async def test_list_users(
|
|
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
|
|
):
|
|
"""Test list users api."""
|
|
ha_ws_client.async_send_command.return_value = LIST_USERS_RESPONSE
|
|
resp = await api_client.get("/auth/list")
|
|
assert resp.status == 200
|
|
result = await resp.json()
|
|
assert result["data"]["users"] == [
|
|
{
|
|
"username": "test",
|
|
"name": "Test",
|
|
"is_owner": True,
|
|
"is_active": True,
|
|
"local_only": False,
|
|
"group_ids": ["system-admin"],
|
|
},
|
|
]
|
|
|
|
|
|
async def test_list_users_ws_error(
|
|
api_client: TestClient,
|
|
ha_ws_client: AsyncMock,
|
|
caplog: pytest.LogCaptureFixture,
|
|
):
|
|
"""Test WS error when listing users via API."""
|
|
ha_ws_client.async_send_command = AsyncMock(
|
|
side_effect=HomeAssistantWSError("fail")
|
|
)
|
|
resp = await api_client.get("/auth/list")
|
|
assert resp.status == 500
|
|
result = await resp.json()
|
|
assert result == {
|
|
"result": "error",
|
|
"message": "Can't request listing users on Home Assistant. Check Supervisor logs for details",
|
|
"error_key": "auth_list_users_error",
|
|
}
|
|
assert "Can't request listing users on Home Assistant: fail" in caplog.text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("field", "api_client"),
|
|
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],
|
|
indirect=["api_client"],
|
|
)
|
|
async def test_auth_json_success(
|
|
api_client: TestClient,
|
|
mock_check_login: AsyncMock,
|
|
install_addon_ssh: Addon,
|
|
field: str,
|
|
):
|
|
"""Test successful JSON auth."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post("/auth", json={field: "test", "password": "pass"})
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("user", "password", "api_client"),
|
|
[
|
|
(None, "password", TEST_ADDON_SLUG),
|
|
("user", None, TEST_ADDON_SLUG),
|
|
],
|
|
indirect=["api_client"],
|
|
)
|
|
async def test_auth_json_failure_none(
|
|
api_client: TestClient,
|
|
mock_check_login: AsyncMock,
|
|
install_addon_ssh: Addon,
|
|
user: str | None,
|
|
password: str | None,
|
|
):
|
|
"""Test failed JSON auth with none user or password."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post("/auth", json={"username": user, "password": password})
|
|
assert resp.status == 401
|
|
assert (
|
|
resp.headers["WWW-Authenticate"]
|
|
== 'Basic realm="Home Assistant Authentication"'
|
|
)
|
|
body = await resp.json()
|
|
assert body["message"] == "Username and password must be strings"
|
|
assert body["error_key"] == "auth_invalid_non_string_value_error"
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_invalid_credentials(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test failed JSON auth due to invalid credentials."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth", json={"username": "test", "password": "wrong"}
|
|
)
|
|
assert WWW_AUTHENTICATE not in resp.headers
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_empty_body(api_client: TestClient, install_addon_ssh: Addon):
|
|
"""Test JSON auth with empty body."""
|
|
resp = await api_client.post(
|
|
"/auth", data="", headers={"Content-Type": "application/json"}
|
|
)
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_json_invalid_json(api_client: TestClient, install_addon_ssh: Addon):
|
|
"""Test JSON auth with malformed JSON."""
|
|
resp = await api_client.post(
|
|
"/auth", data="{not json}", headers={"Content-Type": "application/json"}
|
|
)
|
|
assert resp.status == 400
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_urlencoded_success(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test successful URL-encoded auth."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post(
|
|
"/auth",
|
|
data="username=test&password=pass",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_urlencoded_failure(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test URL-encoded auth with invalid credentials."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth",
|
|
data="username=test&password=fail",
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
assert WWW_AUTHENTICATE not in resp.headers
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_unsupported_content_type(
|
|
api_client: TestClient, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with unsupported content type."""
|
|
resp = await api_client.post(
|
|
"/auth", data="something", headers={"Content-Type": "text/plain"}
|
|
)
|
|
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_basic_auth(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with BasicAuth header."""
|
|
mock_check_login.return_value = True
|
|
resp = await api_client.post(
|
|
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_basic_auth_failure(
|
|
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
|
|
):
|
|
"""Test auth with BasicAuth header and failure."""
|
|
mock_check_login.return_value = False
|
|
resp = await api_client.post(
|
|
"/auth", headers={"Authorization": "Basic dGVzdDpwYXNz"}
|
|
)
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
async def test_auth_bearer_token_returns_401(
|
|
api_client: TestClient, install_addon_ssh: Addon
|
|
):
|
|
"""Test that a Bearer token in Authorization header returns 401, not 500."""
|
|
resp = await api_client.post(
|
|
"/auth", headers={"Authorization": "Bearer sometoken123"}
|
|
)
|
|
assert "Basic realm" in resp.headers[WWW_AUTHENTICATE]
|
|
assert resp.status == 401
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", ["local_example"], indirect=True)
|
|
async def test_auth_addon_no_auth_access(
|
|
api_client: TestClient, install_addon_example: Addon
|
|
):
|
|
"""Test auth where add-on is not allowed to access auth API."""
|
|
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
|
|
assert resp.status == 403
|
|
|
|
|
|
async def test_non_addon_token_no_auth_access(api_client: TestClient):
|
|
"""Test auth where add-on is not allowed to access auth API."""
|
|
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
|
|
assert resp.status == 403
|
|
|
|
|
|
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
|
|
@pytest.mark.usefixtures("install_addon_ssh")
|
|
async def test_auth_backend_login_failure(api_client: TestClient):
|
|
"""Test backend login failure on auth."""
|
|
with (
|
|
patch.object(HomeAssistantAPI, "check_api_state", return_value=True),
|
|
patch.object(
|
|
HomeAssistantAPI, "make_request", side_effect=HomeAssistantAPIError("fail")
|
|
),
|
|
):
|
|
resp = await api_client.post(
|
|
"/auth", json={"username": "test", "password": "pass"}
|
|
)
|
|
assert resp.status == 500
|
|
body = await resp.json()
|
|
assert (
|
|
body["message"]
|
|
== "Unable to validate authentication details with Home Assistant. Check Supervisor logs for details"
|
|
)
|
|
assert body["error_key"] == "auth_home_assistant_api_validation_error"
|
|
assert "extra_fields" not in body
|