mirror of
https://github.com/home-assistant/core.git
synced 2026-04-24 18:59:22 +01:00
Add MFA login flow support for cloud component (#132497)
* Add MFA login flow support for cloud component * Add tests for cloud MFA login * Update code to reflect used package changes * Update code to use underlying package changes * Remove unused change * Fix login required parameters * Fix parameter validation * Use cv.has_at_least_one_key for param validation --------- Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
@@ -8,7 +8,12 @@ from unittest.mock import AsyncMock, MagicMock, Mock, patch
|
||||
|
||||
import aiohttp
|
||||
from hass_nabucasa import thingtalk
|
||||
from hass_nabucasa.auth import Unauthenticated, UnknownError
|
||||
from hass_nabucasa.auth import (
|
||||
InvalidTotpCode,
|
||||
MFARequired,
|
||||
Unauthenticated,
|
||||
UnknownError,
|
||||
)
|
||||
from hass_nabucasa.const import STATE_CONNECTED
|
||||
from hass_nabucasa.voice import TTS_VOICES
|
||||
import pytest
|
||||
@@ -378,6 +383,128 @@ async def test_login_view_invalid_credentials(
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
|
||||
|
||||
async def test_login_view_mfa_required(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test logging in when MFA is required."""
|
||||
cloud_client = await hass_client()
|
||||
cloud.login.side_effect = MFARequired(mfa_tokens={"session": "tokens"})
|
||||
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login", json={"email": "my_username", "password": "my_password"}
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
res = await req.json()
|
||||
assert res["code"] == "mfarequired"
|
||||
|
||||
|
||||
async def test_login_view_mfa_required_tokens_missing(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test logging in when MFA is required, code is provided, but session tokens are missing."""
|
||||
cloud_client = await hass_client()
|
||||
cloud.login.side_effect = MFARequired(mfa_tokens={})
|
||||
|
||||
# Login with password and get MFA required error
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login", json={"email": "my_username", "password": "my_password"}
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
res = await req.json()
|
||||
assert res["code"] == "mfarequired"
|
||||
|
||||
# Login with TOTP code and get MFA expired error
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login",
|
||||
json={"email": "my_username", "code": "123346"},
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.BAD_REQUEST
|
||||
res = await req.json()
|
||||
assert res["code"] == "mfaexpiredornotstarted"
|
||||
|
||||
|
||||
async def test_login_view_mfa_password_and_totp_provided(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test logging in when password and TOTP code provided at once."""
|
||||
cloud_client = await hass_client()
|
||||
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login",
|
||||
json={"email": "my_username", "password": "my_password", "code": "123346"},
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.BAD_REQUEST
|
||||
|
||||
|
||||
async def test_login_view_invalid_totp_code(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test logging in when MFA is required and invalid code is provided."""
|
||||
cloud_client = await hass_client()
|
||||
cloud.login.side_effect = MFARequired(mfa_tokens={"session": "tokens"})
|
||||
cloud.login_verify_totp.side_effect = InvalidTotpCode
|
||||
|
||||
# Login with password and get MFA required error
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login", json={"email": "my_username", "password": "my_password"}
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
res = await req.json()
|
||||
assert res["code"] == "mfarequired"
|
||||
|
||||
# Login with TOTP code and get invalid TOTP code error
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login",
|
||||
json={"email": "my_username", "code": "123346"},
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.BAD_REQUEST
|
||||
res = await req.json()
|
||||
assert res["code"] == "invalidtotpcode"
|
||||
|
||||
|
||||
async def test_login_view_valid_totp_provided(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test logging in with valid TOTP code."""
|
||||
cloud_client = await hass_client()
|
||||
cloud.login.side_effect = MFARequired(mfa_tokens={"session": "tokens"})
|
||||
|
||||
# Login with password and get MFA required error
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login", json={"email": "my_username", "password": "my_password"}
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||
res = await req.json()
|
||||
assert res["code"] == "mfarequired"
|
||||
|
||||
# Login with TOTP code and get success response
|
||||
req = await cloud_client.post(
|
||||
"/api/cloud/login",
|
||||
json={"email": "my_username", "code": "123346"},
|
||||
)
|
||||
|
||||
assert req.status == HTTPStatus.OK
|
||||
result = await req.json()
|
||||
assert result == {"success": True, "cloud_pipeline": None}
|
||||
|
||||
|
||||
async def test_login_view_unknown_error(
|
||||
cloud: MagicMock,
|
||||
setup_cloud: None,
|
||||
|
||||
Reference in New Issue
Block a user