"""The tests for the Home Assistant HTTP component.""" from http import HTTPStatus from ipaddress import ip_address import logging import os from unittest.mock import AsyncMock, Mock, mock_open, patch from aiohttp import web from aiohttp.web_exceptions import HTTPUnauthorized from aiohttp.web_middlewares import middleware import pytest from homeassistant.components import http from homeassistant.components.http.ban import ( IP_BANS_FILE, KEY_BAN_MANAGER, KEY_FAILED_LOGIN_ATTEMPTS, process_success_login, setup_bans, ) from homeassistant.components.http.view import request_handler_factory from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.http import KEY_AUTHENTICATED, KEY_HASS from homeassistant.setup import async_setup_component from tests.common import async_get_persistent_notifications from tests.test_util import mock_real_ip from tests.typing import ClientSessionGenerator SUPERVISOR_IP = "1.2.3.4" BANNED_IPS = ["200.201.202.203", "100.64.0.2"] BANNED_IPS_WITH_SUPERVISOR = [*BANNED_IPS, SUPERVISOR_IP] @pytest.fixture(name="hassio_env") def hassio_env_fixture(supervisor_is_connected: AsyncMock): """Fixture to inject hassio env.""" with ( patch.dict(os.environ, {"SUPERVISOR": "127.0.0.1"}), patch.dict(os.environ, {"SUPERVISOR_TOKEN": "123456"}), ): yield @pytest.fixture(autouse=True) def gethostbyaddr_mock(): """Fixture to mock out I/O on getting host by address.""" with patch( "homeassistant.components.http.ban.gethostbyaddr", return_value=("example.com", ["0.0.0.0.in-addr.arpa"], ["0.0.0.0"]), ): yield async def test_access_from_banned_ip( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator ) -> None: """Test accessing to server from banned IP. Both trusted and not.""" app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) with patch( "homeassistant.components.http.ban.load_yaml_config_file", return_value={ banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS }, ): client = await aiohttp_client(app) for remote_addr in BANNED_IPS: set_real_ip(remote_addr) resp = await client.get("/") assert resp.status == HTTPStatus.FORBIDDEN async def test_access_from_banned_ip_with_partially_broken_yaml_file( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture, ) -> None: """Test accessing to server from banned IP. Both trusted and not. We inject some garbage into the yaml file to make sure it can still load the bans. """ app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) data = {banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS} data["5.3.3.3"] = {"banned_at": "garbage"} with patch( "homeassistant.components.http.ban.load_yaml_config_file", return_value=data, ): client = await aiohttp_client(app) for remote_addr in BANNED_IPS: set_real_ip(remote_addr) resp = await client.get("/") assert resp.status == HTTPStatus.FORBIDDEN # Ensure garbage data is ignored set_real_ip("5.3.3.3") resp = await client.get("/") assert resp.status == HTTPStatus.NOT_FOUND assert "Failed to load IP ban" in caplog.text async def test_access_from_banned_ip_with_invalid_ip_entry( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture, ) -> None: """Test that invalid IP addresses in ban file are skipped gracefully. An invalid IP entry (e.g., with typo like "Eo128.199.160.243") should be logged as an error and skipped, allowing valid bans to still load. The test ensures that valid IPs after invalid ones are still processed. """ app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) # Invalid IPs interspersed between valid ones to ensure continue works data = { "Eo128.199.160.243": {"banned_at": "2024-07-06T14:07:46"}, BANNED_IPS[0]: {"banned_at": "2016-11-16T19:20:03"}, "invalidip": {"banned_at": "2024-07-06T14:07:46"}, BANNED_IPS[1]: {"banned_at": "2016-11-16T19:20:03"}, } with patch( "homeassistant.components.http.ban.load_yaml_config_file", return_value=data, ): client = await aiohttp_client(app) # Verify exactly 2 valid IPs were loaded (invalid ones skipped) manager = app[KEY_BAN_MANAGER] assert len(manager.ip_bans_lookup) == len(BANNED_IPS) # Valid banned IPs should still be blocked (even though they came after invalid ones) for remote_addr in BANNED_IPS: set_real_ip(remote_addr) resp = await client.get("/") assert resp.status == HTTPStatus.FORBIDDEN # Non-banned IP should have access set_real_ip("192.168.1.1") resp = await client.get("/") assert resp.status == HTTPStatus.NOT_FOUND # Check that both invalid IP entries were logged for ip in ("Eo128.199.160.243", "invalidip"): assert ( "homeassistant.components.http.ban", logging.ERROR, f"Failed to load IP ban: invalid IP address {ip}", ) in caplog.record_tuples async def test_no_ip_bans_file( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator ) -> None: """Test no ip bans file.""" app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) with patch( "homeassistant.components.http.ban.load_yaml_config_file", side_effect=FileNotFoundError, ): client = await aiohttp_client(app) set_real_ip("4.3.2.1") resp = await client.get("/") assert resp.status == HTTPStatus.NOT_FOUND async def test_failure_loading_ip_bans_file( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator ) -> None: """Test failure loading ip bans file.""" app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) with patch( "homeassistant.components.http.ban.load_yaml_config_file", side_effect=HomeAssistantError, ): client = await aiohttp_client(app) set_real_ip("4.3.2.1") resp = await client.get("/") assert resp.status == HTTPStatus.NOT_FOUND async def test_ip_ban_manager_never_started( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture, ) -> None: """Test we handle the ip ban manager not being started.""" app = web.Application() app[KEY_HASS] = hass setup_bans(hass, app, 5) set_real_ip = mock_real_ip(app) with patch( "homeassistant.components.http.ban.load_yaml_config_file", side_effect=FileNotFoundError, ): client = await aiohttp_client(app) # Mock the manager never being started del app[KEY_BAN_MANAGER] set_real_ip("4.3.2.1") resp = await client.get("/") assert resp.status == HTTPStatus.NOT_FOUND assert "IP Ban middleware loaded but banned IPs not loaded" in caplog.text @pytest.mark.parametrize( ("remote_addr", "bans", "status"), list( zip( BANNED_IPS_WITH_SUPERVISOR, [1, 1, 0], [HTTPStatus.FORBIDDEN, HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED], strict=False, ) ), ) async def test_access_from_supervisor_ip( remote_addr, bans, status, hass: HomeAssistant, aiohttp_client: ClientSessionGenerator, hassio_env, resolution_info: AsyncMock, ) -> None: """Test accessing to server from supervisor IP.""" app = web.Application() app[KEY_HASS] = hass async def unauth_handler(request): """Return a mock web response.""" raise HTTPUnauthorized app.router.add_get("/", unauth_handler) setup_bans(hass, app, 1) mock_real_ip(app)(remote_addr) with patch( "homeassistant.components.http.ban.load_yaml_config_file", return_value={}, ): client = await aiohttp_client(app) manager = app[KEY_BAN_MANAGER] assert await async_setup_component(hass, "hassio", {"hassio": {}}) m_open = mock_open() with ( patch.dict(os.environ, {"SUPERVISOR": SUPERVISOR_IP}), patch("homeassistant.components.http.ban.open", m_open, create=True), ): resp = await client.get("/") assert resp.status == HTTPStatus.UNAUTHORIZED assert len(manager.ip_bans_lookup) == bans assert m_open.call_count == bans # second request should be forbidden if banned resp = await client.get("/") assert resp.status == status assert len(manager.ip_bans_lookup) == bans async def test_ban_middleware_not_loaded_by_config(hass: HomeAssistant) -> None: """Test accessing to server from banned IP when feature is off.""" with patch("homeassistant.components.http.setup_bans") as mock_setup: await async_setup_component( hass, "http", {"http": {http.CONF_IP_BAN_ENABLED: False}} ) assert len(mock_setup.mock_calls) == 0 async def test_ban_middleware_loaded_by_default(hass: HomeAssistant) -> None: """Test accessing to server from banned IP when feature is off.""" with patch("homeassistant.components.http.setup_bans") as mock_setup: await async_setup_component(hass, "http", {"http": {}}) assert len(mock_setup.mock_calls) == 1 async def test_ip_bans_file_creation( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator, caplog: pytest.LogCaptureFixture, ) -> None: """Testing if banned IP file created.""" app = web.Application() app[KEY_HASS] = hass async def unauth_handler(request): """Return a mock web response.""" raise HTTPUnauthorized app.router.add_get("/example", unauth_handler) setup_bans(hass, app, 2) mock_real_ip(app)("200.201.202.204") with patch( "homeassistant.components.http.ban.load_yaml_config_file", return_value={ banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS }, ): client = await aiohttp_client(app) manager = app[KEY_BAN_MANAGER] m_open = mock_open() with patch("homeassistant.components.http.ban.open", m_open, create=True): resp = await client.get("/example") assert resp.status == HTTPStatus.UNAUTHORIZED assert len(manager.ip_bans_lookup) == len(BANNED_IPS) assert m_open.call_count == 0 resp = await client.get("/example") assert resp.status == HTTPStatus.UNAUTHORIZED assert len(manager.ip_bans_lookup) == len(BANNED_IPS) + 1 m_open.assert_called_once_with( hass.config.path(IP_BANS_FILE), "a", encoding="utf8" ) resp = await client.get("/example") assert resp.status == HTTPStatus.FORBIDDEN assert m_open.call_count == 1 notifications = async_get_persistent_notifications(hass) assert len(notifications) == 2 assert ( notifications["http-login"]["message"] == "Login attempt or request with invalid authentication from example.com (200.201.202.204). See the log for details." ) assert ( "Login attempt or request with invalid authentication from example.com (200.201.202.204). Requested URL: '/example'." in caplog.text ) async def test_failed_login_attempts_counter( hass: HomeAssistant, aiohttp_client: ClientSessionGenerator ) -> None: """Testing if failed login attempts counter increased.""" app = web.Application() app[KEY_HASS] = hass async def auth_handler(request): """Return 200 status code.""" return None, 200 async def auth_true_handler(request): """Return 200 status code.""" process_success_login(request) return None, 200 app.router.add_get( "/auth_true", request_handler_factory(hass, Mock(requires_auth=True), auth_true_handler), ) app.router.add_get( "/auth_false", request_handler_factory(hass, Mock(requires_auth=True), auth_handler), ) app.router.add_get( "/", request_handler_factory(hass, Mock(requires_auth=False), auth_handler) ) setup_bans(hass, app, 5) remote_ip = ip_address("200.201.202.204") mock_real_ip(app)("200.201.202.204") @middleware async def mock_auth(request, handler): """Mock auth middleware.""" if "auth_true" in request.path: request[KEY_AUTHENTICATED] = True else: request[KEY_AUTHENTICATED] = False return await handler(request) app.middlewares.append(mock_auth) client = await aiohttp_client(app) resp = await client.get("/auth_false") assert resp.status == HTTPStatus.UNAUTHORIZED assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1 resp = await client.get("/auth_false") assert resp.status == HTTPStatus.UNAUTHORIZED assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2 resp = await client.get("/") assert resp.status == HTTPStatus.OK assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2 # This used to check that with trusted networks we reset login attempts # We no longer support trusted networks. resp = await client.get("/auth_true") assert resp.status == HTTPStatus.OK assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 0 resp = await client.get("/auth_false") assert resp.status == HTTPStatus.UNAUTHORIZED assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1 resp = await client.get("/auth_false") assert resp.status == HTTPStatus.UNAUTHORIZED assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2 async def test_single_ban_file_entry( hass: HomeAssistant, ) -> None: """Test that only one item is added to ban file.""" app = web.Application() app[KEY_HASS] = hass async def unauth_handler(request): """Return a mock web response.""" raise HTTPUnauthorized app.router.add_get("/example", unauth_handler) setup_bans(hass, app, 2) mock_real_ip(app)("200.201.202.204") manager = app[KEY_BAN_MANAGER] m_open = mock_open() with patch("homeassistant.components.http.ban.open", m_open, create=True): remote_ip = ip_address("200.201.202.204") await manager.async_add_ban(remote_ip) await manager.async_add_ban(remote_ip) assert m_open.call_count == 1