diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index e2057a61c..256cbbeca 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -20,7 +20,7 @@ from typing import Any, Final, cast import aiohttp from awesomeversion import AwesomeVersion, AwesomeVersionCompareException from deepmerge import Merger -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error @@ -76,6 +76,7 @@ from ..exceptions import ( AddonsError, AddonsJobError, AddonUnknownError, + BackupInvalidError, BackupRestoreUnknownError, ConfigurationFileError, DockerBuildError, @@ -1444,10 +1445,11 @@ class Addon(AddonModel): tmp = TemporaryDirectory(dir=self.sys_config.path_tmp) try: with tar_file as backup: + # The tar filter rejects path traversal and absolute names, + # aborting restore of malicious backups with such exploits. backup.extractall( path=tmp.name, - members=secure_path(backup), - filter="fully_trusted", + filter="tar", ) data = read_json_file(Path(tmp.name, "addon.json")) @@ -1459,8 +1461,12 @@ class Addon(AddonModel): try: tmp, data = await self.sys_run_in_executor(_extract_tarfile) + except tarfile.FilterError as err: + raise BackupInvalidError( + f"Can't extract backup tarfile for {self.slug}: {err}", + _LOGGER.error, + ) from err except tarfile.TarError as err: - _LOGGER.error("Can't extract backup tarfile for %s: %s", self.slug, err) raise BackupRestoreUnknownError() from err except ConfigurationFileError as err: raise AddonUnknownError(addon=self.slug) from err diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index 9a626aae9..bdfa85e65 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -18,7 +18,7 @@ import time from typing import Any, Self, cast from awesomeversion import AwesomeVersion, AwesomeVersionCompareException -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error @@ -512,12 +512,24 @@ class Backup(JobGroup): ) tmp = TemporaryDirectory(dir=str(backup_tarfile.parent)) - with tarfile.open(backup_tarfile, "r:") as tar: - tar.extractall( - path=tmp.name, - members=secure_path(tar), - filter="fully_trusted", - ) + try: + with tarfile.open(backup_tarfile, "r:") as tar: + # The tar filter rejects path traversal and absolute names, + # aborting restore of potentially crafted backups. + tar.extractall( + path=tmp.name, + filter="tar", + ) + except tarfile.FilterError as err: + raise BackupInvalidError( + f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}", + _LOGGER.error, + ) from err + except tarfile.TarError as err: + raise BackupError( + f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}", + _LOGGER.error, + ) from err return tmp @@ -798,10 +810,17 @@ class Backup(JobGroup): bufsize=BUF_SIZE, password=self._password, ) as tar_file: + # The tar filter rejects path traversal and absolute names, + # aborting restore of potentially crafted backups. tar_file.extractall( - path=origin_dir, members=tar_file, filter="fully_trusted" + path=origin_dir, + filter="tar", ) _LOGGER.info("Restore folder %s done", name) + except tarfile.FilterError as err: + raise BackupInvalidError( + f"Can't restore folder {name}: {err}", _LOGGER.warning + ) from err except (tarfile.TarError, OSError) as err: raise BackupError( f"Can't restore folder {name}: {err}", _LOGGER.warning diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index 6d6a0f6ad..0e9d77e0c 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -12,7 +12,7 @@ from typing import Any from uuid import UUID from awesomeversion import AwesomeVersion, AwesomeVersionException -from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path +from securetar import AddFileError, SecureTarFile, atomic_contents_add import voluptuous as vol from voluptuous.humanize import humanize_error @@ -38,6 +38,7 @@ from ..const import ( ) from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import ( + BackupInvalidError, ConfigurationFileError, HomeAssistantBackupError, HomeAssistantError, @@ -492,11 +493,16 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): # extract backup try: with tar_file as backup: + # The tar filter rejects path traversal and absolute names, + # aborting restore of potentially crafted backups. backup.extractall( path=temp_path, - members=secure_path(backup), - filter="fully_trusted", + filter="tar", ) + except tarfile.FilterError as err: + raise BackupInvalidError( + f"Invalid tarfile {tar_file}: {err}", _LOGGER.error + ) from err except tarfile.TarError as err: raise HomeAssistantError( f"Can't read tarfile {tar_file}: {err}", _LOGGER.error diff --git a/tests/backups/test_backup_security.py b/tests/backups/test_backup_security.py new file mode 100644 index 000000000..596d0927b --- /dev/null +++ b/tests/backups/test_backup_security.py @@ -0,0 +1,257 @@ +"""Security tests for backup tar extraction with tar filter.""" + +import io +from pathlib import Path +import tarfile + +import pytest +from securetar import SecureTarFile + +from supervisor.addons.addon import Addon +from supervisor.backups.backup import Backup +from supervisor.backups.const import BackupType +from supervisor.coresys import CoreSys +from supervisor.exceptions import BackupInvalidError + + +def _create_tar_gz( + path: Path, + members: list[tarfile.TarInfo], + file_data: dict[str, bytes] | None = None, +) -> None: + """Create a tar.gz file with specified members.""" + if file_data is None: + file_data = {} + with tarfile.open(path, "w:gz") as tar: + for info in members: + data = file_data.get(info.name) + if data is not None: + tar.addfile(info, io.BytesIO(data)) + else: + tar.addfile(info) + + +def test_path_traversal_rejected(tmp_path: Path): + """Test that path traversal in member names is rejected.""" + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + tar_path = tmp_path / "test.tar.gz" + _create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"}) + + dest = tmp_path / "out" + dest.mkdir() + with ( + tarfile.open(tar_path, "r:gz") as tar, + pytest.raises(tarfile.OutsideDestinationError), + ): + tar.extractall(path=dest, filter="tar") + + +def test_symlink_write_through_rejected(tmp_path: Path): + """Test that writing through a symlink to outside destination is rejected. + + The tar filter's realpath check follows already-extracted symlinks on disk, + catching write-through attacks even without explicit link target validation. + """ + # Symlink pointing outside, then a file entry writing through it + link_info = tarfile.TarInfo(name="escape") + link_info.type = tarfile.SYMTYPE + link_info.linkname = "../outside" + file_info = tarfile.TarInfo(name="escape/evil.py") + file_info.size = 9 + tar_path = tmp_path / "test.tar.gz" + _create_tar_gz( + tar_path, + [link_info, file_info], + {"escape/evil.py": b"malicious"}, + ) + + dest = tmp_path / "out" + dest.mkdir() + with ( + tarfile.open(tar_path, "r:gz") as tar, + pytest.raises(tarfile.OutsideDestinationError), + ): + tar.extractall(path=dest, filter="tar") + + # The evil file must not exist outside the destination + assert not (tmp_path / "outside" / "evil.py").exists() + + +def test_absolute_name_stripped_and_extracted(tmp_path: Path): + """Test that absolute member names have leading / stripped and extract safely.""" + info = tarfile.TarInfo(name="/etc/test.conf") + info.size = 5 + tar_path = tmp_path / "test.tar.gz" + _create_tar_gz(tar_path, [info], {"/etc/test.conf": b"hello"}) + + dest = tmp_path / "out" + dest.mkdir() + with tarfile.open(tar_path, "r:gz") as tar: + tar.extractall(path=dest, filter="tar") + + # Extracted inside destination with leading / stripped + assert (dest / "etc" / "test.conf").read_text() == "hello" + + +def test_valid_backup_with_internal_symlinks(tmp_path: Path): + """Test that valid backups with internal relative symlinks extract correctly.""" + dir_info = tarfile.TarInfo(name="subdir") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + + file_info = tarfile.TarInfo(name="subdir/config.yaml") + file_info.size = 11 + + link_info = tarfile.TarInfo(name="config_link") + link_info.type = tarfile.SYMTYPE + link_info.linkname = "subdir/config.yaml" + + tar_path = tmp_path / "test.tar.gz" + _create_tar_gz( + tar_path, + [dir_info, file_info, link_info], + {"subdir/config.yaml": b"key: value\n"}, + ) + + dest = tmp_path / "out" + dest.mkdir() + with tarfile.open(tar_path, "r:gz") as tar: + tar.extractall(path=dest, filter="tar") + + assert (dest / "subdir" / "config.yaml").read_text() == "key: value\n" + assert (dest / "config_link").is_symlink() + assert (dest / "config_link").read_text() == "key: value\n" + + +def test_uid_gid_preserved(tmp_path: Path): + """Test that tar filter preserves file ownership.""" + info = tarfile.TarInfo(name="owned_file.txt") + info.size = 5 + info.uid = 1000 + info.gid = 1000 + tar_path = tmp_path / "test.tar.gz" + _create_tar_gz(tar_path, [info], {"owned_file.txt": b"hello"}) + + dest = tmp_path / "out" + dest.mkdir() + with tarfile.open(tar_path, "r:gz") as tar: + # Extract member via filter only (don't actually extract, just check + # the filter preserves uid/gid) + for member in tar: + filtered = tarfile.tar_filter(member, str(dest)) + assert filtered.uid == 1000 + assert filtered.gid == 1000 + + +async def test_backup_open_rejects_path_traversal(coresys: CoreSys, tmp_path: Path): + """Test that Backup.open() raises BackupInvalidError for path traversal.""" + tar_path = tmp_path / "malicious.tar" + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + with tarfile.open(tar_path, "w:") as tar: + tar.addfile(traversal_info, io.BytesIO(b"malicious")) + + backup = Backup(coresys, tar_path, "test", None) + with pytest.raises(BackupInvalidError): + async with backup.open(None): + pass + + +async def test_homeassistant_restore_rejects_path_traversal( + coresys: CoreSys, tmp_supervisor_data: Path +): + """Test that Home Assistant restore raises BackupInvalidError for path traversal.""" + tar_path = tmp_supervisor_data / "homeassistant.tar.gz" + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + _create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"}) + + tar_file = SecureTarFile(tar_path, "r", gzip=True) + with pytest.raises(BackupInvalidError): + await coresys.homeassistant.restore(tar_file) + + +async def test_addon_restore_rejects_path_traversal( + coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path +): + """Test that add-on restore raises BackupInvalidError for path traversal.""" + tar_path = tmp_supervisor_data / "addon.tar.gz" + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + _create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"}) + + tar_file = SecureTarFile(tar_path, "r", gzip=True) + with pytest.raises(BackupInvalidError): + await install_addon_ssh.restore(tar_file) + + +async def test_addon_restore_rejects_symlink_escape( + coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path +): + """Test that add-on restore raises BackupInvalidError for symlink escape.""" + link_info = tarfile.TarInfo(name="escape") + link_info.type = tarfile.SYMTYPE + link_info.linkname = "../outside" + file_info = tarfile.TarInfo(name="escape/evil.py") + file_info.size = 9 + + tar_path = tmp_supervisor_data / "addon.tar.gz" + _create_tar_gz( + tar_path, + [link_info, file_info], + {"escape/evil.py": b"malicious"}, + ) + + tar_file = SecureTarFile(tar_path, "r", gzip=True) + with pytest.raises(BackupInvalidError): + await install_addon_ssh.restore(tar_file) + + +async def test_folder_restore_rejects_path_traversal( + coresys: CoreSys, tmp_supervisor_data: Path +): + """Test that folder restore rejects path traversal in backup tar.""" + traversal_info = tarfile.TarInfo(name="../../etc/passwd") + traversal_info.size = 9 + + # Create backup with a malicious share folder tar inside + backup_tar_path = tmp_supervisor_data / "backup.tar" + with tarfile.open(backup_tar_path, "w:") as outer_tar: + share_tar_path = tmp_supervisor_data / "share.tar.gz" + _create_tar_gz( + share_tar_path, [traversal_info], {"../../etc/passwd": b"malicious"} + ) + outer_tar.add(share_tar_path, arcname="./share.tar.gz") + + backup = Backup(coresys, backup_tar_path, "test", None) + backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True) + async with backup.open(None): + assert await backup.restore_folders(["share"]) is False + + +async def test_folder_restore_rejects_symlink_escape( + coresys: CoreSys, tmp_supervisor_data: Path +): + """Test that folder restore rejects symlink escape in backup tar.""" + link_info = tarfile.TarInfo(name="escape") + link_info.type = tarfile.SYMTYPE + link_info.linkname = "../outside" + file_info = tarfile.TarInfo(name="escape/evil.py") + file_info.size = 9 + + # Create backup with a malicious share folder tar inside + backup_tar_path = tmp_supervisor_data / "backup.tar" + with tarfile.open(backup_tar_path, "w:") as outer_tar: + share_tar_path = tmp_supervisor_data / "share.tar.gz" + _create_tar_gz( + share_tar_path, + [link_info, file_info], + {"escape/evil.py": b"malicious"}, + ) + outer_tar.add(share_tar_path, arcname="./share.tar.gz") + + backup = Backup(coresys, backup_tar_path, "test", None) + backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True) + async with backup.open(None): + assert await backup.restore_folders(["share"]) is False