diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index e2057a61c..df3ca697a 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -20,10 +20,11 @@ from typing import Any, Final, cast import aiohttp from awesomeversion import AwesomeVersion, AwesomeVersionCompareException from deepmerge import Merger -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error +from ..backups.utils import backup_data_filter from ..bus import EventListener from ..const import ( ATTR_ACCESS_TOKEN, @@ -1446,8 +1447,7 @@ class Addon(AddonModel): with tar_file as backup: backup.extractall( path=tmp.name, - members=secure_path(backup), - filter="fully_trusted", + filter=backup_data_filter, ) data = read_json_file(Path(tmp.name, "addon.json")) diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index 9a626aae9..0a2d1e2ec 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -18,7 +18,7 @@ import time from typing import Any, Self, cast from awesomeversion import AwesomeVersion, AwesomeVersionCompareException -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error @@ -60,6 +60,7 @@ from ..utils.dt import parse_datetime, utcnow from ..utils.json import json_bytes from ..utils.sentinel import DEFAULT from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, BackupType +from .utils import backup_data_filter from .validate import SCHEMA_BACKUP IGNORED_COMPARISON_FIELDS = {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER} @@ -515,8 +516,7 @@ class Backup(JobGroup): with tarfile.open(backup_tarfile, "r:") as tar: tar.extractall( path=tmp.name, - members=secure_path(tar), - filter="fully_trusted", + filter=backup_data_filter, ) return tmp @@ -799,7 +799,8 @@ class Backup(JobGroup): password=self._password, ) as tar_file: tar_file.extractall( - path=origin_dir, members=tar_file, filter="fully_trusted" + path=origin_dir, + filter=backup_data_filter, ) _LOGGER.info("Restore folder %s done", name) except (tarfile.TarError, OSError) as err: diff --git a/supervisor/backups/utils.py b/supervisor/backups/utils.py index e327c9c73..c2a3d74a1 100644 --- a/supervisor/backups/utils.py +++ b/supervisor/backups/utils.py @@ -1,7 +1,11 @@ """Util add-on functions.""" import hashlib +import logging import re +import tarfile + +_LOGGER: logging.Logger = logging.getLogger(__name__) RE_DIGITS = re.compile(r"\d+") @@ -10,3 +14,19 @@ def create_slug(name: str, date_str: str) -> str: """Generate a hash from repository.""" key = f"{date_str} - {name}".lower().encode() return hashlib.sha1(key).hexdigest()[:8] + + +def backup_data_filter( + member: tarfile.TarInfo, dest_path: str +) -> tarfile.TarInfo | None: + """Filter for backup tar extraction. + + Applies tarfile.data_filter for security (rejects dangerous symlinks, + device nodes, resets uid/gid) but skips problematic entries with a + warning instead of aborting the entire extraction. + """ + try: + return tarfile.data_filter(member, dest_path) + except tarfile.FilterError as err: + _LOGGER.warning("Skipping %s: %s", member.name, err) + return None diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index b8f51e205..01073b410 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -13,10 +13,11 @@ from typing import Any from uuid import UUID from awesomeversion import AwesomeVersion, AwesomeVersionException -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error +from ..backups.utils import backup_data_filter from ..const import ( ATTR_ACCESS_TOKEN, ATTR_AUDIO_INPUT, @@ -497,8 +498,7 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): with tar_file as backup: backup.extractall( path=temp_path, - members=secure_path(backup), - filter="fully_trusted", + filter=backup_data_filter, ) except tarfile.TarError as err: raise HomeAssistantError( diff --git a/tests/backups/test_backup_security.py b/tests/backups/test_backup_security.py new file mode 100644 index 000000000..5c99a17a2 --- /dev/null +++ b/tests/backups/test_backup_security.py @@ -0,0 +1,198 @@ +"""Security tests for backup tar extraction.""" + +import io +from pathlib import Path +import tarfile +from tempfile import TemporaryDirectory + +from supervisor.backups.backup import Backup +from supervisor.backups.const import BackupType +from supervisor.coresys import CoreSys + + +def _create_tar_gz( + path: Path, + members: list[tarfile.TarInfo], + file_data: dict[str, bytes] | None = None, +) -> None: + """Create a tar.gz file with specified members.""" + if file_data is None: + file_data = {} + with tarfile.open(path, "w:gz") as tar: + for info in members: + data = file_data.get(info.name) + if data is not None: + tar.addfile(info, io.BytesIO(data)) + else: + tar.addfile(info) + + +def _setup_backup_for_folder_restore( + coresys: CoreSys, tmp_path: Path +) -> tuple[Backup, Path]: + """Set up a backup object ready for _folder_restore testing. + + Returns the backup and the path where inner tar files should be placed. + """ + backup = Backup(coresys, tmp_path / "test.tar", "test", None) + backup.new("Test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL) + + # Create a directory that simulates the extracted outer backup tar + inner_dir = tmp_path / "extracted_backup" + inner_dir.mkdir() + backup._tmp = TemporaryDirectory() + # Override .name to point at our controlled directory + backup._tmp.name = str(inner_dir) + + return backup, inner_dir + + +async def test_absolute_symlink_in_folder_tar_skipped( + coresys: CoreSys, tmp_supervisor_data: Path, tmp_path: Path +): + """Test that absolute symlinks in folder tars are skipped during restore.""" + backup, inner_dir = _setup_backup_for_folder_restore(coresys, tmp_path) + + # Create a tar.gz with an absolute symlink and a normal file + evil_info = tarfile.TarInfo(name="evil_link") + evil_info.type = tarfile.SYMTYPE + evil_info.linkname = "/etc/shadow" + normal_info = tarfile.TarInfo(name="normal.txt") + normal_info.size = 5 + _create_tar_gz( + inner_dir / "share.tar.gz", + [evil_info, normal_info], + {"normal.txt": b"hello"}, + ) + + origin_dir = coresys.config.path_supervisor / "share" + origin_dir.mkdir(parents=True, exist_ok=True) + + # Should succeed — absolute symlink is skipped, not fatal + await backup._folder_restore("share") + + # Normal file extracted, dangerous symlink was not + assert (origin_dir / "normal.txt").read_text() == "hello" + assert not (origin_dir / "evil_link").exists() + + +async def test_relative_symlink_escape_in_folder_tar_skipped( + coresys: CoreSys, tmp_supervisor_data: Path, tmp_path: Path +): + """Test that relative symlinks escaping destination are skipped.""" + backup, inner_dir = _setup_backup_for_folder_restore(coresys, tmp_path) + + # Create a tar.gz with an escaping symlink and a normal file + evil_info = tarfile.TarInfo(name="escape_link") + evil_info.type = tarfile.SYMTYPE + evil_info.linkname = "../../etc/shadow" + normal_info = tarfile.TarInfo(name="normal.txt") + normal_info.size = 5 + _create_tar_gz( + inner_dir / "share.tar.gz", + [evil_info, normal_info], + {"normal.txt": b"hello"}, + ) + + origin_dir = coresys.config.path_supervisor / "share" + origin_dir.mkdir(parents=True, exist_ok=True) + + # Should succeed — escaping symlink is skipped, not fatal + await backup._folder_restore("share") + + assert (origin_dir / "normal.txt").read_text() == "hello" + assert not (origin_dir / "escape_link").exists() + + +async def test_device_node_in_folder_tar_skipped( + coresys: CoreSys, tmp_supervisor_data: Path, tmp_path: Path +): + """Test that device nodes in folder tars are skipped during restore.""" + backup, inner_dir = _setup_backup_for_folder_restore(coresys, tmp_path) + + # Create a tar.gz with a character device and a normal file + evil_info = tarfile.TarInfo(name="evil_device") + evil_info.type = tarfile.CHRTYPE + evil_info.devmajor = 1 + evil_info.devminor = 5 # /dev/zero + normal_info = tarfile.TarInfo(name="normal.txt") + normal_info.size = 5 + _create_tar_gz( + inner_dir / "share.tar.gz", + [evil_info, normal_info], + {"normal.txt": b"hello"}, + ) + + origin_dir = coresys.config.path_supervisor / "share" + origin_dir.mkdir(parents=True, exist_ok=True) + + # Should succeed — device node is skipped, not fatal + await backup._folder_restore("share") + + assert (origin_dir / "normal.txt").read_text() == "hello" + assert not (origin_dir / "evil_device").exists() + + +async def test_path_traversal_in_folder_tar_rejected( + coresys: CoreSys, tmp_supervisor_data: Path, tmp_path: Path +): + """Test that path traversal entries are filtered out by secure_path.""" + backup, inner_dir = _setup_backup_for_folder_restore(coresys, tmp_path) + + # Create a tar.gz with a path traversal entry alongside a normal file + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + normal_info = tarfile.TarInfo(name="normal.txt") + normal_info.size = 5 + _create_tar_gz( + inner_dir / "share.tar.gz", + [traversal_info, normal_info], + {"../../etc/passwd": b"malicious", "normal.txt": b"hello"}, + ) + + origin_dir = coresys.config.path_supervisor / "share" + origin_dir.mkdir(parents=True, exist_ok=True) + + # Should not raise — secure_path silently filters out the traversal entry + await backup._folder_restore("share") + + # Normal file should be extracted + assert (origin_dir / "normal.txt").read_text() == "hello" + + # Traversal file should NOT exist anywhere outside origin_dir + assert not (origin_dir / "../../etc/passwd").exists() + + +async def test_valid_backup_with_internal_symlinks( + coresys: CoreSys, tmp_supervisor_data: Path, tmp_path: Path +): + """Test that valid backups with internal relative symlinks extract correctly.""" + backup, inner_dir = _setup_backup_for_folder_restore(coresys, tmp_path) + + # Create a tar.gz with a directory, a file, and an internal relative symlink + dir_info = tarfile.TarInfo(name="subdir") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + + file_info = tarfile.TarInfo(name="subdir/config.yaml") + file_info.size = 11 + + link_info = tarfile.TarInfo(name="config_link") + link_info.type = tarfile.SYMTYPE + link_info.linkname = "subdir/config.yaml" + + _create_tar_gz( + inner_dir / "share.tar.gz", + [dir_info, file_info, link_info], + {"subdir/config.yaml": b"key: value\n"}, + ) + + origin_dir = coresys.config.path_supervisor / "share" + origin_dir.mkdir(parents=True, exist_ok=True) + + await backup._folder_restore("share") + + # Verify the symlink was extracted and points to the right target + assert (origin_dir / "subdir" / "config.yaml").read_text() == "key: value\n" + assert (origin_dir / "config_link").is_symlink() + assert (origin_dir / "config_link").read_text() == "key: value\n"