"""Tests for the Backup integration.""" from __future__ import annotations import asyncio from collections.abc import Generator from io import StringIO import json from typing import Any from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, mock_open, patch import pytest from homeassistant.components.backup import ( DOMAIN, AgentBackup, BackupAgentPlatformProtocol, BackupManager, BackupPlatformProtocol, Folder, backup as local_backup_platform, ) from homeassistant.components.backup.const import DATA_MANAGER from homeassistant.components.backup.manager import ( BackupManagerState, CoreBackupReaderWriter, CreateBackupEvent, CreateBackupStage, CreateBackupState, NewBackup, WrittenBackup, ) from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component from .common import ( LOCAL_AGENT_ID, TEST_BACKUP_ABC123, TEST_BACKUP_DEF456, BackupAgentTest, ) from tests.common import MockPlatform, mock_platform from tests.typing import ClientSessionGenerator, WebSocketGenerator _EXPECTED_FILES = [ "test.txt", ".storage", "backups", "backups/not_backup", "tmp_backups", "tmp_backups/not_backup", ] _EXPECTED_FILES_WITH_DATABASE = { True: [*_EXPECTED_FILES, "home-assistant_v2.db"], False: _EXPECTED_FILES, } async def _setup_backup_platform( hass: HomeAssistant, *, domain: str = "some_domain", platform: BackupPlatformProtocol | BackupAgentPlatformProtocol | None = None, ) -> None: """Set up a mock domain.""" mock_platform(hass, f"{domain}.backup", platform or MockPlatform()) assert await async_setup_component(hass, domain, {}) await hass.async_block_till_done() @pytest.fixture(autouse=True) def mock_delay_save() -> Generator[None]: """Mock the delay save constant.""" with patch("homeassistant.components.backup.store.STORE_DELAY_SAVE", 0): yield @pytest.fixture(name="generate_backup_id") def generate_backup_id_fixture() -> Generator[MagicMock]: """Mock generate backup id.""" with patch("homeassistant.components.backup.manager._generate_backup_id") as mock: mock.return_value = "abc123" yield mock @pytest.mark.usefixtures("mock_backup_generation") async def test_async_create_backup( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mocked_json_bytes: Mock, mocked_tarfile: Mock, ) -> None: """Test create backup.""" assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() new_backup = NewBackup(backup_job_id="time-123") backup_task = AsyncMock( return_value=WrittenBackup( backup=TEST_BACKUP_ABC123, open_stream=AsyncMock(), release_stream=AsyncMock(), ), )() # call it so that it can be awaited with patch( "homeassistant.components.backup.manager.CoreBackupReaderWriter.async_create_backup", return_value=(new_backup, backup_task), ) as create_backup: await hass.services.async_call( DOMAIN, "create", blocking=True, ) assert create_backup.called assert create_backup.call_args == call( agent_ids=["backup.local"], backup_name="Core 2025.1.0", include_addons=None, include_all_addons=False, include_database=True, include_folders=None, include_homeassistant=True, on_progress=ANY, password=None, ) async def test_async_create_backup_when_backing_up(hass: HomeAssistant) -> None: """Test generate backup.""" manager = BackupManager(hass, CoreBackupReaderWriter(hass)) manager.last_event = CreateBackupEvent( stage=None, state=CreateBackupState.IN_PROGRESS ) with pytest.raises(HomeAssistantError, match="Backup manager busy"): await manager.async_create_backup( agent_ids=[LOCAL_AGENT_ID], include_addons=[], include_all_addons=False, include_database=True, include_folders=[], include_homeassistant=True, name=None, password=None, ) @pytest.mark.parametrize( ("parameters", "expected_error"), [ ({"agent_ids": []}, "At least one agent must be selected"), ({"agent_ids": ["non_existing"]}, "Invalid agent selected"), ( {"include_addons": ["ssl"], "include_all_addons": True}, "Cannot include all addons and specify specific addons", ), ({"include_homeassistant": False}, "Home Assistant must be included in backup"), ], ) async def test_create_backup_wrong_parameters( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, parameters: dict[str, Any], expected_error: str, ) -> None: """Test create backup with wrong parameters.""" assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) default_parameters = { "agent_ids": [LOCAL_AGENT_ID], "include_addons": [], "include_all_addons": False, "include_database": True, "include_folders": [], "include_homeassistant": True, } await ws_client.send_json_auto_id( {"type": "backup/generate"} | default_parameters | parameters ) result = await ws_client.receive_json() assert result["success"] is False assert result["error"]["code"] == "home_assistant_error" assert result["error"]["message"] == expected_error @pytest.mark.usefixtures("mock_backup_generation") @pytest.mark.parametrize( ("agent_ids", "backup_directory", "temp_file_unlink_call_count"), [ ([LOCAL_AGENT_ID], "backups", 0), (["test.remote"], "tmp_backups", 1), ([LOCAL_AGENT_ID, "test.remote"], "backups", 0), ], ) @pytest.mark.parametrize( "params", [ {}, {"include_database": True, "name": "abc123"}, {"include_database": False}, {"password": "pass123"}, ], ) async def test_async_initiate_backup( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, caplog: pytest.LogCaptureFixture, mocked_json_bytes: Mock, mocked_tarfile: Mock, generate_backup_id: MagicMock, path_glob: MagicMock, params: dict[str, Any], agent_ids: list[str], backup_directory: str, temp_file_unlink_call_count: int, ) -> None: """Test generate backup.""" local_agent = local_backup_platform.CoreLocalBackupAgent(hass) remote_agent = BackupAgentTest("remote", backups=[]) agents = { f"backup.{local_agent.name}": local_agent, f"test.{remote_agent.name}": remote_agent, } with patch( "homeassistant.components.backup.backup.async_get_backup_agents" ) as core_get_backup_agents: core_get_backup_agents.return_value = [local_agent] await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() await _setup_backup_platform( hass, domain="test", platform=Mock( async_get_backup_agents=AsyncMock(return_value=[remote_agent]), spec_set=BackupAgentPlatformProtocol, ), ) ws_client = await hass_ws_client(hass) include_database = params.get("include_database", True) name = params.get("name", "Core 2025.1.0") password = params.get("password") path_glob.return_value = [] await ws_client.send_json_auto_id({"type": "backup/info"}) result = await ws_client.receive_json() assert result["success"] is True assert result["result"] == { "backups": [], "agent_errors": {}, "last_attempted_strategy_backup": None, "last_completed_strategy_backup": None, } await ws_client.send_json_auto_id({"type": "backup/subscribe_events"}) result = await ws_client.receive_json() assert result["event"] == {"manager_state": BackupManagerState.IDLE} result = await ws_client.receive_json() assert result["success"] is True with ( patch("pathlib.Path.open", mock_open(read_data=b"test")), patch("pathlib.Path.unlink") as unlink_mock, ): await ws_client.send_json_auto_id( {"type": "backup/generate", "agent_ids": agent_ids} | params ) result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": None, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["success"] is True backup_id = result["result"]["backup_job_id"] assert backup_id == generate_backup_id.return_value await hass.async_block_till_done() result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": CreateBackupStage.HOME_ASSISTANT, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": CreateBackupStage.UPLOAD_TO_AGENTS, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": None, "state": CreateBackupState.COMPLETED, } result = await ws_client.receive_json() assert result["event"] == {"manager_state": BackupManagerState.IDLE} assert unlink_mock.call_count == temp_file_unlink_call_count assert mocked_json_bytes.call_count == 1 backup_json_dict = mocked_json_bytes.call_args[0][0] assert isinstance(backup_json_dict, dict) assert backup_json_dict == { "compressed": True, "date": ANY, "homeassistant": { "exclude_database": not include_database, "version": "2025.1.0", }, "name": name, "protected": bool(password), "slug": ANY, "type": "partial", "version": 2, } await ws_client.send_json_auto_id( {"type": "backup/details", "backup_id": backup_id} ) result = await ws_client.receive_json() backup_data = result["result"]["backup"] backup_agent_ids = backup_data.pop("agent_ids") assert backup_agent_ids == agent_ids backup = AgentBackup.from_dict(backup_data) assert backup == AgentBackup( addons=[], backup_id=ANY, database_included=include_database, date=ANY, folders=[], homeassistant_included=True, homeassistant_version="2025.1.0", name=name, protected=bool(password), size=ANY, ) for agent_id in agent_ids: agent = agents[agent_id] assert len(agent._backups) == 1 agent_backup = agent._backups[backup.backup_id] assert agent_backup.backup_id == backup.backup_id assert agent_backup.date == backup.date assert agent_backup.name == backup.name assert agent_backup.protected == backup.protected assert agent_backup.size == backup.size outer_tar = mocked_tarfile.return_value core_tar = outer_tar.create_inner_tar.return_value.__enter__.return_value expected_files = [call(hass.config.path(), arcname="data", recursive=False)] + [ call(file, arcname=f"data/{file}", recursive=False) for file in _EXPECTED_FILES_WITH_DATABASE[include_database] ] assert core_tar.add.call_args_list == expected_files tar_file_path = str(mocked_tarfile.call_args_list[0][0][0]) backup_directory = hass.config.path(backup_directory) assert tar_file_path == f"{backup_directory}/{backup.backup_id}.tar" @pytest.mark.usefixtures("mock_backup_generation") async def test_async_initiate_backup_with_agent_error( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, mocked_json_bytes: Mock, mocked_tarfile: Mock, generate_backup_id: MagicMock, path_glob: MagicMock, hass_storage: dict[str, Any], ) -> None: """Test generate backup.""" agent_ids = [LOCAL_AGENT_ID, "test.remote"] local_agent = local_backup_platform.CoreLocalBackupAgent(hass) remote_agent = BackupAgentTest("remote", backups=[]) with patch( "homeassistant.components.backup.backup.async_get_backup_agents" ) as core_get_backup_agents: core_get_backup_agents.return_value = [local_agent] await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() await _setup_backup_platform( hass, domain="test", platform=Mock( async_get_backup_agents=AsyncMock(return_value=[remote_agent]), spec_set=BackupAgentPlatformProtocol, ), ) ws_client = await hass_ws_client(hass) path_glob.return_value = [] await ws_client.send_json_auto_id({"type": "backup/info"}) result = await ws_client.receive_json() assert result["success"] is True assert result["result"] == { "backups": [], "agent_errors": {}, "last_attempted_strategy_backup": None, "last_completed_strategy_backup": None, } await ws_client.send_json_auto_id({"type": "backup/subscribe_events"}) result = await ws_client.receive_json() assert result["event"] == {"manager_state": BackupManagerState.IDLE} result = await ws_client.receive_json() assert result["success"] is True with ( patch("pathlib.Path.open", mock_open(read_data=b"test")), patch.object( remote_agent, "async_upload_backup", side_effect=Exception("Test exception") ), ): await ws_client.send_json_auto_id( {"type": "backup/generate", "agent_ids": agent_ids} ) result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": None, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["success"] is True backup_id = result["result"]["backup_job_id"] assert backup_id == generate_backup_id.return_value await hass.async_block_till_done() result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": CreateBackupStage.HOME_ASSISTANT, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": CreateBackupStage.UPLOAD_TO_AGENTS, "state": CreateBackupState.IN_PROGRESS, } result = await ws_client.receive_json() assert result["event"] == { "manager_state": BackupManagerState.CREATE_BACKUP, "stage": None, "state": CreateBackupState.COMPLETED, } result = await ws_client.receive_json() assert result["event"] == {"manager_state": BackupManagerState.IDLE} expected_backup_data = { "addons": [], "agent_ids": ["backup.local"], "backup_id": "abc123", "database_included": True, "date": ANY, "failed_agent_ids": ["test.remote"], "folders": [], "homeassistant_included": True, "homeassistant_version": "2025.1.0", "name": "Core 2025.1.0", "protected": False, "size": 123, "with_strategy_settings": False, } await ws_client.send_json_auto_id( {"type": "backup/details", "backup_id": backup_id} ) result = await ws_client.receive_json() assert result["result"] == { "agent_errors": {}, "backup": expected_backup_data, } await ws_client.send_json_auto_id({"type": "backup/info"}) result = await ws_client.receive_json() assert result["result"] == { "agent_errors": {}, "backups": [expected_backup_data], "last_attempted_strategy_backup": None, "last_completed_strategy_backup": None, } await hass.async_block_till_done() assert hass_storage[DOMAIN]["data"]["backups"] == [ { "backup_id": "abc123", "failed_agent_ids": ["test.remote"], "with_strategy_settings": False, } ] async def test_loading_platforms( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, ) -> None: """Test loading backup platforms.""" manager = BackupManager(hass, CoreBackupReaderWriter(hass)) assert not manager.platforms await _setup_backup_platform( hass, platform=Mock( async_pre_backup=AsyncMock(), async_post_backup=AsyncMock(), async_get_backup_agents=AsyncMock(), ), ) await manager.load_platforms() await hass.async_block_till_done() assert len(manager.platforms) == 1 assert "Loaded 1 platforms" in caplog.text @pytest.mark.parametrize( "platform_mock", [ Mock(async_pre_backup=AsyncMock(), spec=["async_pre_backup"]), Mock(async_post_backup=AsyncMock(), spec=["async_post_backup"]), Mock(spec=[]), ], ) async def test_not_loading_bad_platforms( hass: HomeAssistant, platform_mock: Mock, ) -> None: """Test not loading bad backup platforms.""" await _setup_backup_platform( hass, domain="test", platform=platform_mock, ) assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() assert platform_mock.mock_calls == [] async def test_exception_platform_pre( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test exception in pre step.""" async def _mock_step(hass: HomeAssistant) -> None: raise HomeAssistantError("Test exception") remote_agent = BackupAgentTest("remote", backups=[]) await _setup_backup_platform( hass, domain="test", platform=Mock( async_pre_backup=_mock_step, async_post_backup=AsyncMock(), async_get_backup_agents=AsyncMock(return_value=[remote_agent]), ), ) assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() await hass.services.async_call( DOMAIN, "create", blocking=True, ) assert "Generating backup failed" in caplog.text assert "Test exception" in caplog.text @pytest.mark.usefixtures("mock_backup_generation") async def test_exception_platform_post( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, ) -> None: """Test exception in post step.""" async def _mock_step(hass: HomeAssistant) -> None: raise HomeAssistantError("Test exception") remote_agent = BackupAgentTest("remote", backups=[]) await _setup_backup_platform( hass, domain="test", platform=Mock( async_pre_backup=AsyncMock(), async_post_backup=_mock_step, async_get_backup_agents=AsyncMock(return_value=[remote_agent]), ), ) assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() await hass.services.async_call( DOMAIN, "create", blocking=True, ) assert "Generating backup failed" in caplog.text assert "Test exception" in caplog.text @pytest.mark.parametrize( ( "agent_id_params", "open_call_count", "move_call_count", "move_path_names", "remote_agent_backups", "remote_agent_backup_data", "temp_file_unlink_call_count", ), [ ( "agent_id=backup.local&agent_id=test.remote", 2, 1, ["abc123.tar"], {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123}, b"test", 0, ), ( "agent_id=backup.local", 1, 1, ["abc123.tar"], {}, None, 0, ), ( "agent_id=test.remote", 2, 0, [], {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123}, b"test", 1, ), ], ) async def test_receive_backup( hass: HomeAssistant, hass_client: ClientSessionGenerator, agent_id_params: str, open_call_count: int, move_call_count: int, move_path_names: list[str], remote_agent_backups: dict[str, AgentBackup], remote_agent_backup_data: bytes | None, temp_file_unlink_call_count: int, ) -> None: """Test receive backup and upload to the local and a remote agent.""" remote_agent = BackupAgentTest("remote", backups=[]) await _setup_backup_platform( hass, domain="test", platform=Mock( async_get_backup_agents=AsyncMock(return_value=[remote_agent]), spec_set=BackupAgentPlatformProtocol, ), ) assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() client = await hass_client() upload_data = "test" open_mock = mock_open(read_data=upload_data.encode(encoding="utf-8")) with ( patch("pathlib.Path.open", open_mock), patch("shutil.move") as move_mock, patch( "homeassistant.components.backup.manager.read_backup", return_value=TEST_BACKUP_ABC123, ), patch("pathlib.Path.unlink") as unlink_mock, ): resp = await client.post( f"/api/backup/upload?{agent_id_params}", data={"file": StringIO(upload_data)}, ) await hass.async_block_till_done() assert resp.status == 201 assert open_mock.call_count == open_call_count assert move_mock.call_count == move_call_count for index, name in enumerate(move_path_names): assert move_mock.call_args_list[index].args[1].name == name assert remote_agent._backups == remote_agent_backups assert remote_agent._backup_data == remote_agent_backup_data assert unlink_mock.call_count == temp_file_unlink_call_count @pytest.mark.usefixtures("mock_backup_generation") async def test_receive_backup_busy_manager( hass: HomeAssistant, hass_client: ClientSessionGenerator, hass_ws_client: WebSocketGenerator, ) -> None: """Test receive backup with a busy manager.""" assert await async_setup_component(hass, DOMAIN, {}) await hass.async_block_till_done() client = await hass_client() ws_client = await hass_ws_client(hass) upload_data = "test" await ws_client.send_json_auto_id({"type": "backup/subscribe_events"}) result = await ws_client.receive_json() assert result["event"] == {"manager_state": "idle"} result = await ws_client.receive_json() assert result["success"] is True new_backup = NewBackup(backup_job_id="time-123") backup_task: asyncio.Future[WrittenBackup] = asyncio.Future() with patch( "homeassistant.components.backup.manager.CoreBackupReaderWriter.async_create_backup", return_value=(new_backup, backup_task), ) as create_backup: await ws_client.send_json_auto_id( {"type": "backup/generate", "agent_ids": ["backup.local"]} ) result = await ws_client.receive_json() assert result["event"] == { "manager_state": "create_backup", "stage": None, "state": "in_progress", } result = await ws_client.receive_json() assert result["success"] is True assert result["result"] == {"backup_job_id": "time-123"} assert create_backup.call_count == 1 resp = await client.post( "/api/backup/upload?agent_id=backup.local", data={"file": StringIO(upload_data)}, ) assert resp.status == 500 assert ( await resp.text() == "Can't upload backup file: Backup manager busy: create_backup" ) # finish the backup backup_task.set_result( WrittenBackup( backup=TEST_BACKUP_ABC123, open_stream=AsyncMock(), release_stream=AsyncMock(), ) ) await hass.async_block_till_done() @pytest.mark.parametrize( ("agent_id", "password", "restore_database", "restore_homeassistant", "dir"), [ (LOCAL_AGENT_ID, None, True, False, "backups"), (LOCAL_AGENT_ID, "abc123", False, True, "backups"), ("test.remote", None, True, True, "tmp_backups"), ], ) async def test_async_trigger_restore( hass: HomeAssistant, agent_id: str, password: str | None, restore_database: bool, restore_homeassistant: bool, dir: str, ) -> None: """Test trigger restore.""" manager = BackupManager(hass, CoreBackupReaderWriter(hass)) hass.data[DATA_MANAGER] = manager await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform) await _setup_backup_platform( hass, domain="test", platform=Mock( async_get_backup_agents=AsyncMock( return_value=[BackupAgentTest("remote", backups=[TEST_BACKUP_ABC123])] ), spec_set=BackupAgentPlatformProtocol, ), ) await manager.load_platforms() local_agent = manager.backup_agents[LOCAL_AGENT_ID] local_agent._backups = {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123} local_agent._loaded_backups = True with ( patch("pathlib.Path.exists", return_value=True), patch("pathlib.Path.open"), patch("pathlib.Path.write_text") as mocked_write_text, patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call, patch.object(BackupAgentTest, "async_download_backup") as download_mock, ): download_mock.return_value.__aiter__.return_value = iter((b"backup data",)) await manager.async_restore_backup( TEST_BACKUP_ABC123.backup_id, agent_id=agent_id, password=password, restore_addons=None, restore_database=restore_database, restore_folders=None, restore_homeassistant=restore_homeassistant, ) expected_restore_file = json.dumps( { "path": f"{hass.config.path()}/{dir}/abc123.tar", "password": password, "remove_after_restore": agent_id != LOCAL_AGENT_ID, "restore_database": restore_database, "restore_homeassistant": restore_homeassistant, } ) assert mocked_write_text.call_args[0][0] == expected_restore_file assert mocked_service_call.called @pytest.mark.parametrize( ("parameters", "expected_error"), [ ( {"backup_id": TEST_BACKUP_DEF456.backup_id}, "Backup def456 not found", ), ( {"restore_addons": ["blah"]}, "Addons and folders are not supported in core restore", ), ( {"restore_folders": [Folder.ADDONS]}, "Addons and folders are not supported in core restore", ), ( {"restore_database": False, "restore_homeassistant": False}, "Home Assistant or database must be included in restore", ), ], ) async def test_async_trigger_restore_wrong_parameters( hass: HomeAssistant, parameters: dict[str, Any], expected_error: str ) -> None: """Test trigger restore.""" manager = BackupManager(hass, CoreBackupReaderWriter(hass)) await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform) await manager.load_platforms() local_agent = manager.backup_agents[LOCAL_AGENT_ID] local_agent._backups = {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123} local_agent._loaded_backups = True default_parameters = { "agent_id": LOCAL_AGENT_ID, "backup_id": TEST_BACKUP_ABC123.backup_id, "password": None, "restore_addons": None, "restore_database": True, "restore_folders": None, "restore_homeassistant": True, } with ( patch("pathlib.Path.exists", return_value=True), pytest.raises(HomeAssistantError, match=expected_error), ): await manager.async_restore_backup(**(default_parameters | parameters))