diff --git a/homeassistant/components/backup/manager.py b/homeassistant/components/backup/manager.py index 520ea8ea38b..a05a55bf4e9 100644 --- a/homeassistant/components/backup/manager.py +++ b/homeassistant/components/backup/manager.py @@ -12,7 +12,7 @@ import hashlib import io from itertools import chain import json -from pathlib import Path, PurePath +from pathlib import Path, PurePath, PureWindowsPath import shutil import sys import tarfile @@ -1957,7 +1957,10 @@ class CoreBackupReaderWriter(BackupReaderWriter): suggested_filename: str, ) -> WrittenBackup: """Receive a backup.""" - temp_file = Path(self.temp_backup_dir, suggested_filename) + safe_filename = PureWindowsPath(suggested_filename).name + if not safe_filename or safe_filename == "..": + safe_filename = "backup.tar" + temp_file = Path(self.temp_backup_dir, safe_filename) async_add_executor_job = self._hass.async_add_executor_job await async_add_executor_job(make_backup_dir, self.temp_backup_dir) diff --git a/tests/components/backup/test_manager.py b/tests/components/backup/test_manager.py index 2a64bcd6843..4d162f12432 100644 --- a/tests/components/backup/test_manager.py +++ b/tests/components/backup/test_manager.py @@ -23,6 +23,7 @@ from unittest.mock import ( patch, ) +from aiohttp import FormData from freezegun.api import FrozenDateTimeFactory import pytest from securetar import SecureTarArchive, SecureTarFile @@ -2013,6 +2014,73 @@ async def test_receive_backup( assert unlink_mock.call_count == temp_file_unlink_call_count +@pytest.mark.parametrize( + ("suggested_filename", "expected_filename"), + [ + ("backup.tar", "backup.tar"), + ("../traversal.tar", "traversal.tar"), + ("../../etc/passwd", "passwd"), + ("subdir/backup.tar", "backup.tar"), + (".", "backup.tar"), + ("..", "backup.tar"), + ("../..", "backup.tar"), + ("..\\traversal.tar", "traversal.tar"), + ("C:\\fakepath\\backup.tar", "backup.tar"), + ], +) +async def test_receive_backup_path_traversal( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + suggested_filename: str, + expected_filename: str, +) -> None: + """Test path traversal in suggested filename is prevented.""" + await setup_backup_integration(hass) + # Make sure we wait for Platform.EVENT and Platform.SENSOR to be fully processed, + # to avoid interference with the Path.open patching below which is used to verify + # that the file is written to the expected location. + await hass.async_block_till_done(True) + client = await hass_client() + + upload_data = "test" + open_mock = mock_open(read_data=upload_data.encode(encoding="utf-8")) + expected_path = Path(hass.config.path("tmp_backups"), expected_filename) + opened_paths: list[Path] = [] + + def track_open(self: Path, *args: Any, **kwargs: Any) -> Any: + opened_paths.append(self) + return open_mock(self, *args, **kwargs) + + with ( + patch("pathlib.Path.open", track_open), + patch("homeassistant.components.backup.manager.make_backup_dir"), + patch("shutil.move"), + patch( + "homeassistant.components.backup.manager.read_backup", + return_value=TEST_BACKUP_ABC123, + ) as read_backup_mock, + patch("pathlib.Path.unlink"), + ): + data = FormData(quote_fields=False) + data.add_field( + "file", + upload_data, + filename=suggested_filename, + content_type="application/octet-stream", + ) + resp = await client.post( + "/api/backup/upload?agent_id=backup.local", + data=data, + ) + await hass.async_block_till_done() + + assert resp.status == 201 + # Verify all file opens went to the expected safe path + assert opened_paths == [expected_path] + # read_backup is called with the temp_file path; verify it's sanitized + read_backup_mock.assert_called_once_with(expected_path) + + async def test_receive_backup_busy_manager( hass: HomeAssistant, hass_client: ClientSessionGenerator,