mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 00:07:16 +01:00
Harden backup tar extraction with Python tar_filter (#6559)
* Harden backup tar extraction with Python data filter Replace filter="fully_trusted" with a custom backup_data_filter that wraps tarfile.data_filter. This adds protection against symlink attacks (absolute targets, destination escapes), device node injection, and path traversal, while resetting uid/gid and sanitizing permissions. Unlike using data_filter directly, the custom filter skips problematic entries with a warning instead of aborting the entire extraction. This ensures existing backups containing absolute symlinks (e.g. in shared folders) still restore successfully with the dangerous entries omitted. Also removes the now-redundant secure_path member filtering, as data_filter is a strict superset of its protections. Fixes a standalone bug in _folder_restore which had no member filtering at all. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Simplify security tests to test backup_data_filter directly Test the public backup_data_filter function with plain tarfile extraction instead of going through Backup internals. Removes protected-access pylint warnings and unnecessary coresys setup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Switch to tar filter instead of custom data filter wrapper Replace backup_data_filter (which wrapped data_filter and skipped problematic entries) with the built-in tar filter. The tar filter rejects path traversal and absolute names while preserving uid/gid and file permissions, which is important for add-ons running as non-root users. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Apply suggestions from code review Co-authored-by: Erik Montnemery <erik@montnemery.com> * Use BackupInvalidError instead of BackupError for tarfile.TarError Make sure FilterErrors lead to BackupInvalidError instead of BackupError, as they are not related to the backup process itself but rather to the integrity of the backup data. * Improve test coverage and use pytest.raises * Only make FilterError a BackupInvalidError * Add test case for FilterError during Home Assistant Core restore * Add test cases for Add-ons * Fix pylint warnings --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
@@ -20,7 +20,7 @@ from typing import Any, Final, cast
|
||||
import aiohttp
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from deepmerge import Merger
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -76,6 +76,7 @@ from ..exceptions import (
|
||||
AddonsError,
|
||||
AddonsJobError,
|
||||
AddonUnknownError,
|
||||
BackupInvalidError,
|
||||
BackupRestoreUnknownError,
|
||||
ConfigurationFileError,
|
||||
DockerBuildError,
|
||||
@@ -1444,10 +1445,11 @@ class Addon(AddonModel):
|
||||
tmp = TemporaryDirectory(dir=self.sys_config.path_tmp)
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of malicious backups with such exploits.
|
||||
backup.extractall(
|
||||
path=tmp.name,
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
filter="tar",
|
||||
)
|
||||
|
||||
data = read_json_file(Path(tmp.name, "addon.json"))
|
||||
@@ -1459,8 +1461,12 @@ class Addon(AddonModel):
|
||||
|
||||
try:
|
||||
tmp, data = await self.sys_run_in_executor(_extract_tarfile)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't extract backup tarfile for {self.slug}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
_LOGGER.error("Can't extract backup tarfile for %s: %s", self.slug, err)
|
||||
raise BackupRestoreUnknownError() from err
|
||||
except ConfigurationFileError as err:
|
||||
raise AddonUnknownError(addon=self.slug) from err
|
||||
|
||||
@@ -18,7 +18,7 @@ import time
|
||||
from typing import Any, Self, cast
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -512,12 +512,24 @@ class Backup(JobGroup):
|
||||
)
|
||||
tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
|
||||
|
||||
with tarfile.open(backup_tarfile, "r:") as tar:
|
||||
tar.extractall(
|
||||
path=tmp.name,
|
||||
members=secure_path(tar),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
try:
|
||||
with tarfile.open(backup_tarfile, "r:") as tar:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar.extractall(
|
||||
path=tmp.name,
|
||||
filter="tar",
|
||||
)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
raise BackupError(
|
||||
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
|
||||
return tmp
|
||||
|
||||
@@ -798,10 +810,17 @@ class Backup(JobGroup):
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
) as tar_file:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar_file.extractall(
|
||||
path=origin_dir, members=tar_file, filter="fully_trusted"
|
||||
path=origin_dir,
|
||||
filter="tar",
|
||||
)
|
||||
_LOGGER.info("Restore folder %s done", name)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't restore folder {name}: {err}", _LOGGER.warning
|
||||
) from err
|
||||
except (tarfile.TarError, OSError) as err:
|
||||
raise BackupError(
|
||||
f"Can't restore folder {name}: {err}", _LOGGER.warning
|
||||
|
||||
@@ -12,7 +12,7 @@ from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -38,6 +38,7 @@ from ..const import (
|
||||
)
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import (
|
||||
BackupInvalidError,
|
||||
ConfigurationFileError,
|
||||
HomeAssistantBackupError,
|
||||
HomeAssistantError,
|
||||
@@ -492,11 +493,16 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
|
||||
# extract backup
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
backup.extractall(
|
||||
path=temp_path,
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
filter="tar",
|
||||
)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Invalid tarfile {tar_file}: {err}", _LOGGER.error
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
|
||||
|
||||
257
tests/backups/test_backup_security.py
Normal file
257
tests/backups/test_backup_security.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Security tests for backup tar extraction with tar filter."""
|
||||
|
||||
import io
|
||||
from pathlib import Path
|
||||
import tarfile
|
||||
|
||||
import pytest
|
||||
from securetar import SecureTarFile
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
from supervisor.backups.backup import Backup
|
||||
from supervisor.backups.const import BackupType
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import BackupInvalidError
|
||||
|
||||
|
||||
def _create_tar_gz(
|
||||
path: Path,
|
||||
members: list[tarfile.TarInfo],
|
||||
file_data: dict[str, bytes] | None = None,
|
||||
) -> None:
|
||||
"""Create a tar.gz file with specified members."""
|
||||
if file_data is None:
|
||||
file_data = {}
|
||||
with tarfile.open(path, "w:gz") as tar:
|
||||
for info in members:
|
||||
data = file_data.get(info.name)
|
||||
if data is not None:
|
||||
tar.addfile(info, io.BytesIO(data))
|
||||
else:
|
||||
tar.addfile(info)
|
||||
|
||||
|
||||
def test_path_traversal_rejected(tmp_path: Path):
|
||||
"""Test that path traversal in member names is rejected."""
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with (
|
||||
tarfile.open(tar_path, "r:gz") as tar,
|
||||
pytest.raises(tarfile.OutsideDestinationError),
|
||||
):
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
|
||||
def test_symlink_write_through_rejected(tmp_path: Path):
|
||||
"""Test that writing through a symlink to outside destination is rejected.
|
||||
|
||||
The tar filter's realpath check follows already-extracted symlinks on disk,
|
||||
catching write-through attacks even without explicit link target validation.
|
||||
"""
|
||||
# Symlink pointing outside, then a file entry writing through it
|
||||
link_info = tarfile.TarInfo(name="escape")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "../outside"
|
||||
file_info = tarfile.TarInfo(name="escape/evil.py")
|
||||
file_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[link_info, file_info],
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with (
|
||||
tarfile.open(tar_path, "r:gz") as tar,
|
||||
pytest.raises(tarfile.OutsideDestinationError),
|
||||
):
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
# The evil file must not exist outside the destination
|
||||
assert not (tmp_path / "outside" / "evil.py").exists()
|
||||
|
||||
|
||||
def test_absolute_name_stripped_and_extracted(tmp_path: Path):
|
||||
"""Test that absolute member names have leading / stripped and extract safely."""
|
||||
info = tarfile.TarInfo(name="/etc/test.conf")
|
||||
info.size = 5
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"/etc/test.conf": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
# Extracted inside destination with leading / stripped
|
||||
assert (dest / "etc" / "test.conf").read_text() == "hello"
|
||||
|
||||
|
||||
def test_valid_backup_with_internal_symlinks(tmp_path: Path):
|
||||
"""Test that valid backups with internal relative symlinks extract correctly."""
|
||||
dir_info = tarfile.TarInfo(name="subdir")
|
||||
dir_info.type = tarfile.DIRTYPE
|
||||
dir_info.mode = 0o755
|
||||
|
||||
file_info = tarfile.TarInfo(name="subdir/config.yaml")
|
||||
file_info.size = 11
|
||||
|
||||
link_info = tarfile.TarInfo(name="config_link")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "subdir/config.yaml"
|
||||
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[dir_info, file_info, link_info],
|
||||
{"subdir/config.yaml": b"key: value\n"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
assert (dest / "subdir" / "config.yaml").read_text() == "key: value\n"
|
||||
assert (dest / "config_link").is_symlink()
|
||||
assert (dest / "config_link").read_text() == "key: value\n"
|
||||
|
||||
|
||||
def test_uid_gid_preserved(tmp_path: Path):
|
||||
"""Test that tar filter preserves file ownership."""
|
||||
info = tarfile.TarInfo(name="owned_file.txt")
|
||||
info.size = 5
|
||||
info.uid = 1000
|
||||
info.gid = 1000
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"owned_file.txt": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
# Extract member via filter only (don't actually extract, just check
|
||||
# the filter preserves uid/gid)
|
||||
for member in tar:
|
||||
filtered = tarfile.tar_filter(member, str(dest))
|
||||
assert filtered.uid == 1000
|
||||
assert filtered.gid == 1000
|
||||
|
||||
|
||||
async def test_backup_open_rejects_path_traversal(coresys: CoreSys, tmp_path: Path):
|
||||
"""Test that Backup.open() raises BackupInvalidError for path traversal."""
|
||||
tar_path = tmp_path / "malicious.tar"
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
with tarfile.open(tar_path, "w:") as tar:
|
||||
tar.addfile(traversal_info, io.BytesIO(b"malicious"))
|
||||
|
||||
backup = Backup(coresys, tar_path, "test", None)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
async with backup.open(None):
|
||||
pass
|
||||
|
||||
|
||||
async def test_homeassistant_restore_rejects_path_traversal(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that Home Assistant restore raises BackupInvalidError for path traversal."""
|
||||
tar_path = tmp_supervisor_data / "homeassistant.tar.gz"
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await coresys.homeassistant.restore(tar_file)
|
||||
|
||||
|
||||
async def test_addon_restore_rejects_path_traversal(
|
||||
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that add-on restore raises BackupInvalidError for path traversal."""
|
||||
tar_path = tmp_supervisor_data / "addon.tar.gz"
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
|
||||
async def test_addon_restore_rejects_symlink_escape(
|
||||
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that add-on restore raises BackupInvalidError for symlink escape."""
|
||||
link_info = tarfile.TarInfo(name="escape")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "../outside"
|
||||
file_info = tarfile.TarInfo(name="escape/evil.py")
|
||||
file_info.size = 9
|
||||
|
||||
tar_path = tmp_supervisor_data / "addon.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[link_info, file_info],
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
|
||||
async def test_folder_restore_rejects_path_traversal(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that folder restore rejects path traversal in backup tar."""
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
|
||||
# Create backup with a malicious share folder tar inside
|
||||
backup_tar_path = tmp_supervisor_data / "backup.tar"
|
||||
with tarfile.open(backup_tar_path, "w:") as outer_tar:
|
||||
share_tar_path = tmp_supervisor_data / "share.tar.gz"
|
||||
_create_tar_gz(
|
||||
share_tar_path, [traversal_info], {"../../etc/passwd": b"malicious"}
|
||||
)
|
||||
outer_tar.add(share_tar_path, arcname="./share.tar.gz")
|
||||
|
||||
backup = Backup(coresys, backup_tar_path, "test", None)
|
||||
backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True)
|
||||
async with backup.open(None):
|
||||
assert await backup.restore_folders(["share"]) is False
|
||||
|
||||
|
||||
async def test_folder_restore_rejects_symlink_escape(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that folder restore rejects symlink escape in backup tar."""
|
||||
link_info = tarfile.TarInfo(name="escape")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "../outside"
|
||||
file_info = tarfile.TarInfo(name="escape/evil.py")
|
||||
file_info.size = 9
|
||||
|
||||
# Create backup with a malicious share folder tar inside
|
||||
backup_tar_path = tmp_supervisor_data / "backup.tar"
|
||||
with tarfile.open(backup_tar_path, "w:") as outer_tar:
|
||||
share_tar_path = tmp_supervisor_data / "share.tar.gz"
|
||||
_create_tar_gz(
|
||||
share_tar_path,
|
||||
[link_info, file_info],
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
outer_tar.add(share_tar_path, arcname="./share.tar.gz")
|
||||
|
||||
backup = Backup(coresys, backup_tar_path, "test", None)
|
||||
backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True)
|
||||
async with backup.open(None):
|
||||
assert await backup.restore_folders(["share"]) is False
|
||||
Reference in New Issue
Block a user