mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-04-02 00:07:16 +01:00
Bump securetar to 2026.2.0 (#6575)
* Bump securetar from 2025.12.0 to 2026.2.0 Adapt to the new securetar API: - Use SecureTarArchive for outer backup tar (replaces SecureTarFile with gzip=False for the outer container) - create_inner_tar() renamed to create_tar(), password now inherited from the archive rather than passed per inner tar - SecureTarFile no longer accepts a mode parameter (read-only by default, InnerSecureTarFile for writing) - Pass create_version=2 to keep protected backups at version 2 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Reformat imports * Rename _create_cleanup to _create_finalize and update docstring * Use constant for SecureTar create version * Add test for SecureTarReadError in validate_backup securetar >= 2026.2.0 raises SecureTarReadError instead of tarfile.ReadError for invalid passwords. Catching this exception and raising BackupInvalidError is required so Core shows the encryption key dialog to the user. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Handle InvalidPasswordError for v3 backups * Address typos * Add securetar v3 encrypted password test fixture Add a test fixture for a securetar v3 encrypted backup with password. This will be used in the test suite to verify that the backup extraction process correctly handles encrypted backups. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,7 +23,7 @@ pulsectl==24.12.0
|
||||
pyudev==0.24.4
|
||||
PyYAML==6.0.3
|
||||
requests==2.32.5
|
||||
securetar==2025.12.0
|
||||
securetar==2026.2.0
|
||||
sentry-sdk==2.53.0
|
||||
setuptools==82.0.0
|
||||
voluptuous==0.16.0
|
||||
|
||||
@@ -12,13 +12,19 @@ import json
|
||||
import logging
|
||||
from pathlib import Path, PurePath
|
||||
import tarfile
|
||||
from tarfile import TarFile
|
||||
from tempfile import TemporaryDirectory
|
||||
import time
|
||||
from typing import Any, Self, cast
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
from securetar import (
|
||||
AddFileError,
|
||||
InvalidPasswordError,
|
||||
SecureTarArchive,
|
||||
SecureTarFile,
|
||||
SecureTarReadError,
|
||||
atomic_contents_add,
|
||||
)
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -59,7 +65,7 @@ from ..utils import remove_folder
|
||||
from ..utils.dt import parse_datetime, utcnow
|
||||
from ..utils.json import json_bytes
|
||||
from ..utils.sentinel import DEFAULT
|
||||
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, BackupType
|
||||
from .const import BUF_SIZE, LOCATION_CLOUD_BACKUP, SECURETAR_CREATE_VERSION, BackupType
|
||||
from .validate import SCHEMA_BACKUP
|
||||
|
||||
IGNORED_COMPARISON_FIELDS = {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER}
|
||||
@@ -99,7 +105,7 @@ class Backup(JobGroup):
|
||||
)
|
||||
self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
|
||||
self._tmp: TemporaryDirectory | None = None
|
||||
self._outer_secure_tarfile: SecureTarFile | None = None
|
||||
self._outer_secure_tarfile: SecureTarArchive | None = None
|
||||
self._password: str | None = None
|
||||
self._locations: dict[str | None, BackupLocation] = {
|
||||
location: BackupLocation(
|
||||
@@ -364,15 +370,17 @@ class Backup(JobGroup):
|
||||
test_tar_file = backup.extractfile(test_tar_name)
|
||||
try:
|
||||
with SecureTarFile(
|
||||
ending, # Not used
|
||||
gzip=self.compressed,
|
||||
mode="r",
|
||||
fileobj=test_tar_file,
|
||||
password=self._password,
|
||||
):
|
||||
# If we can read the tar file, the password is correct
|
||||
return
|
||||
except tarfile.ReadError as ex:
|
||||
except (
|
||||
tarfile.ReadError,
|
||||
SecureTarReadError,
|
||||
InvalidPasswordError,
|
||||
) as ex:
|
||||
raise BackupInvalidError(
|
||||
f"Invalid password for backup {self.slug}", _LOGGER.error
|
||||
) from ex
|
||||
@@ -441,7 +449,7 @@ class Backup(JobGroup):
|
||||
async def create(self) -> AsyncGenerator[None]:
|
||||
"""Create new backup file."""
|
||||
|
||||
def _open_outer_tarfile() -> tuple[SecureTarFile, tarfile.TarFile]:
|
||||
def _open_outer_tarfile() -> SecureTarArchive:
|
||||
"""Create and open outer tarfile."""
|
||||
if self.tarfile.is_file():
|
||||
raise BackupFileExistError(
|
||||
@@ -449,14 +457,15 @@ class Backup(JobGroup):
|
||||
_LOGGER.error,
|
||||
)
|
||||
|
||||
_outer_secure_tarfile = SecureTarFile(
|
||||
_outer_secure_tarfile = SecureTarArchive(
|
||||
self.tarfile,
|
||||
"w",
|
||||
gzip=False,
|
||||
bufsize=BUF_SIZE,
|
||||
create_version=SECURETAR_CREATE_VERSION,
|
||||
password=self._password,
|
||||
)
|
||||
try:
|
||||
_outer_tarfile = _outer_secure_tarfile.open()
|
||||
_outer_secure_tarfile.open()
|
||||
except PermissionError as ex:
|
||||
raise BackupPermissionError(
|
||||
f"Cannot open backup file {self.tarfile.as_posix()}, permission error!",
|
||||
@@ -468,11 +477,9 @@ class Backup(JobGroup):
|
||||
_LOGGER.error,
|
||||
) from ex
|
||||
|
||||
return _outer_secure_tarfile, _outer_tarfile
|
||||
return _outer_secure_tarfile
|
||||
|
||||
outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
|
||||
_open_outer_tarfile
|
||||
)
|
||||
outer_secure_tarfile = await self.sys_run_in_executor(_open_outer_tarfile)
|
||||
self._outer_secure_tarfile = outer_secure_tarfile
|
||||
|
||||
def _close_outer_tarfile() -> int:
|
||||
@@ -483,7 +490,7 @@ class Backup(JobGroup):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await self._create_cleanup(outer_tarfile)
|
||||
await self._create_finalize(outer_secure_tarfile)
|
||||
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
|
||||
self._locations[self.location].size_bytes = size_bytes
|
||||
self._outer_secure_tarfile = None
|
||||
@@ -543,11 +550,11 @@ class Backup(JobGroup):
|
||||
if self._tmp:
|
||||
await self.sys_run_in_executor(self._tmp.cleanup)
|
||||
|
||||
async def _create_cleanup(self, outer_tarfile: TarFile) -> None:
|
||||
"""Cleanup after backup creation.
|
||||
async def _create_finalize(self, outer_archive: SecureTarArchive) -> None:
|
||||
"""Finalize backup creation.
|
||||
|
||||
Separate method to be called from create to ensure
|
||||
that cleanup is always performed, even if an exception is raised.
|
||||
Separate method to be called from create to ensure that the backup is
|
||||
finalized.
|
||||
"""
|
||||
# validate data
|
||||
try:
|
||||
@@ -566,7 +573,7 @@ class Backup(JobGroup):
|
||||
tar_info = tarfile.TarInfo(name="./backup.json")
|
||||
tar_info.size = len(raw_bytes)
|
||||
tar_info.mtime = int(time.time())
|
||||
outer_tarfile.addfile(tar_info, fileobj=fileobj)
|
||||
outer_archive.tar.addfile(tar_info, fileobj=fileobj)
|
||||
|
||||
try:
|
||||
await self.sys_run_in_executor(_add_backup_json)
|
||||
@@ -593,10 +600,9 @@ class Backup(JobGroup):
|
||||
|
||||
tar_name = f"{slug}.tar{'.gz' if self.compressed else ''}"
|
||||
|
||||
addon_file = self._outer_secure_tarfile.create_inner_tar(
|
||||
addon_file = self._outer_secure_tarfile.create_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
)
|
||||
# Take backup
|
||||
try:
|
||||
@@ -646,7 +652,6 @@ class Backup(JobGroup):
|
||||
tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}"
|
||||
addon_file = SecureTarFile(
|
||||
Path(self._tmp.name, tar_name),
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
@@ -742,10 +747,9 @@ class Backup(JobGroup):
|
||||
|
||||
return False
|
||||
|
||||
with outer_secure_tarfile.create_inner_tar(
|
||||
with outer_secure_tarfile.create_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
) as tar_file:
|
||||
atomic_contents_add(
|
||||
tar_file,
|
||||
@@ -805,7 +809,6 @@ class Backup(JobGroup):
|
||||
_LOGGER.info("Restore folder %s", name)
|
||||
with SecureTarFile(
|
||||
tar_name,
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
@@ -873,10 +876,9 @@ class Backup(JobGroup):
|
||||
|
||||
tar_name = f"homeassistant.tar{'.gz' if self.compressed else ''}"
|
||||
# Backup Home Assistant Core config directory
|
||||
homeassistant_file = self._outer_secure_tarfile.create_inner_tar(
|
||||
homeassistant_file = self._outer_secure_tarfile.create_tar(
|
||||
f"./{tar_name}",
|
||||
gzip=self.compressed,
|
||||
password=self._password,
|
||||
)
|
||||
|
||||
await self.sys_homeassistant.backup(homeassistant_file, exclude_database)
|
||||
@@ -900,7 +902,6 @@ class Backup(JobGroup):
|
||||
)
|
||||
homeassistant_file = SecureTarFile(
|
||||
tar_name,
|
||||
"r",
|
||||
gzip=self.compressed,
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Literal
|
||||
from ..mounts.mount import Mount
|
||||
|
||||
BUF_SIZE = 2**20 * 4 # 4MB
|
||||
SECURETAR_CREATE_VERSION = 2
|
||||
DEFAULT_FREEZE_TIMEOUT = 600
|
||||
LOCATION_CLOUD_BACKUP = ".cloud_backup"
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import aiodocker
|
||||
from aiodocker.containers import DockerContainer
|
||||
from awesomeversion import AwesomeVersion
|
||||
import pytest
|
||||
from securetar import SecureTarFile
|
||||
from securetar import SecureTarArchive, SecureTarFile
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
from supervisor.addons.const import AddonBackupMode
|
||||
@@ -436,8 +436,11 @@ async def test_backup(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -457,8 +460,11 @@ async def test_backup_no_config(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
|
||||
@@ -473,14 +479,17 @@ async def test_backup_with_pre_post_command(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
with (
|
||||
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
|
||||
patch.object(
|
||||
Addon, "backup_post", new=PropertyMock(return_value="backup_post")
|
||||
),
|
||||
):
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
archive.close()
|
||||
|
||||
assert container.exec.call_count == 2
|
||||
assert container.exec.call_args_list[0].args[0] == "backup_pre"
|
||||
@@ -543,15 +552,18 @@ async def test_backup_with_pre_command_error(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
with (
|
||||
patch.object(DockerAddon, "is_running", return_value=True),
|
||||
patch.object(Addon, "backup_pre", new=PropertyMock(return_value="backup_pre")),
|
||||
pytest.raises(exc_type_raised),
|
||||
):
|
||||
assert await install_addon_ssh.backup(tarfile) is None
|
||||
assert await install_addon_ssh.backup(tar_file) is None
|
||||
|
||||
assert not tarfile.path.exists()
|
||||
assert not tar_file.path.exists()
|
||||
archive.close()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -568,7 +580,9 @@ async def test_backup_cold_mode(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
with (
|
||||
patch.object(
|
||||
AddonModel,
|
||||
@@ -579,7 +593,8 @@ async def test_backup_cold_mode(
|
||||
DockerAddon, "is_running", side_effect=[status == "running", False, False]
|
||||
),
|
||||
):
|
||||
start_task = await install_addon_ssh.backup(tarfile)
|
||||
start_task = await install_addon_ssh.backup(tar_file)
|
||||
archive.close()
|
||||
|
||||
assert bool(start_task) is (status == "running")
|
||||
|
||||
@@ -607,7 +622,9 @@ async def test_backup_cold_mode_with_watchdog(
|
||||
|
||||
# Patching out the normal end of backup process leaves the container in a stopped state
|
||||
# Watchdog should still not try to restart it though, it should remain this way
|
||||
tarfile = SecureTarFile(coresys.config.path_tmp / "test.tar.gz", "w")
|
||||
archive = SecureTarArchive(coresys.config.path_tmp / "test.tar", "w")
|
||||
archive.open()
|
||||
tar_file = archive.create_tar("./test.tar.gz")
|
||||
with (
|
||||
patch.object(Addon, "start") as start,
|
||||
patch.object(Addon, "restart") as restart,
|
||||
@@ -619,10 +636,11 @@ async def test_backup_cold_mode_with_watchdog(
|
||||
new=PropertyMock(return_value=AddonBackupMode.COLD),
|
||||
),
|
||||
):
|
||||
await install_addon_ssh.backup(tarfile)
|
||||
await install_addon_ssh.backup(tar_file)
|
||||
await asyncio.sleep(0)
|
||||
start.assert_not_called()
|
||||
restart.assert_not_called()
|
||||
archive.close()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", ["running", "stopped"])
|
||||
@@ -635,7 +653,7 @@ async def test_restore(coresys: CoreSys, install_addon_ssh: Addon, status: str)
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"), "r")
|
||||
tarfile = SecureTarFile(get_fixture_path(f"backup_local_ssh_{status}.tar.gz"))
|
||||
with patch.object(DockerAddon, "is_running", return_value=False):
|
||||
start_task = await coresys.addons.restore(TEST_ADDON_SLUG, tarfile)
|
||||
|
||||
@@ -655,7 +673,7 @@ async def test_restore_while_running(
|
||||
install_addon_ssh.path_data.mkdir()
|
||||
await install_addon_ssh.load()
|
||||
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
|
||||
with (
|
||||
patch.object(DockerAddon, "is_running", return_value=True),
|
||||
patch.object(Ingress, "update_hass_panel"),
|
||||
@@ -688,7 +706,7 @@ async def test_restore_while_running_with_watchdog(
|
||||
|
||||
# We restore a stopped backup so restore will not restart it
|
||||
# Watchdog will see it stop and should not attempt reanimation either
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"), "r")
|
||||
tarfile = SecureTarFile(get_fixture_path("backup_local_ssh_stopped.tar.gz"))
|
||||
with (
|
||||
patch.object(Addon, "start") as start,
|
||||
patch.object(Addon, "restart") as restart,
|
||||
|
||||
@@ -8,7 +8,7 @@ import tarfile
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from securetar import AddFileError
|
||||
from securetar import AddFileError, InvalidPasswordError, SecureTarReadError
|
||||
|
||||
from supervisor.addons.addon import Addon
|
||||
from supervisor.backups.backup import Backup, BackupLocation
|
||||
@@ -234,7 +234,21 @@ async def test_consolidate_failure(coresys: CoreSys, tmp_path: Path):
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password
|
||||
), # Invalid password (legacy securetar exception)
|
||||
(
|
||||
None,
|
||||
SecureTarReadError,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password (securetar >= 2026.2.0 raises SecureTarReadError)
|
||||
(
|
||||
None,
|
||||
InvalidPasswordError,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup 93b462f8"
|
||||
),
|
||||
), # Invalid password (securetar >= 2026.2.0 with v3 backup raises InvalidPasswordError)
|
||||
],
|
||||
)
|
||||
async def test_validate_backup(
|
||||
@@ -244,7 +258,12 @@ async def test_validate_backup(
|
||||
securetar_side_effect: type[Exception] | None,
|
||||
expected_exception: AbstractContextManager,
|
||||
):
|
||||
"""Parameterized test for validate_backup."""
|
||||
"""Parameterized test for validate_backup.
|
||||
|
||||
Note that it is paramount that BackupInvalidError is raised for invalid password
|
||||
cases, as this is used by the Core to determine if a backup password is invalid
|
||||
and offer a input field to the user to input the correct password.
|
||||
"""
|
||||
enc_tar = Path(copy(get_fixture_path("backup_example_enc.tar"), tmp_path))
|
||||
enc_backup = Backup(coresys, enc_tar, "test", None)
|
||||
await enc_backup.load()
|
||||
@@ -273,3 +292,44 @@ async def test_validate_backup(
|
||||
expected_exception,
|
||||
):
|
||||
await enc_backup.validate_backup(None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("password", "expected_exception"),
|
||||
[
|
||||
("supervisor", does_not_raise()),
|
||||
(
|
||||
"wrong_password",
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup f92f0339"
|
||||
),
|
||||
),
|
||||
(
|
||||
None,
|
||||
pytest.raises(
|
||||
BackupInvalidError, match="Invalid password for backup f92f0339"
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_validate_backup_v3(
|
||||
coresys: CoreSys,
|
||||
tmp_path: Path,
|
||||
password: str | None,
|
||||
expected_exception: AbstractContextManager,
|
||||
):
|
||||
"""Test validate_backup with a real SecureTar v3 encrypted backup.
|
||||
|
||||
SecureTar v3 uses Argon2id key derivation and raises InvalidPasswordError
|
||||
on wrong passwords. It is paramount that BackupInvalidError is raised for
|
||||
invalid password cases, as this is used by the Core to determine if a backup
|
||||
password is invalid and offer a dialog to the user to input the correct
|
||||
password.
|
||||
"""
|
||||
v3_tar = Path(copy(get_fixture_path("backup_example_sec_v3.tar"), tmp_path))
|
||||
v3_backup = Backup(coresys, v3_tar, "test", None)
|
||||
await v3_backup.load()
|
||||
v3_backup.set_password(password)
|
||||
|
||||
with expected_exception:
|
||||
await v3_backup.validate_backup(None)
|
||||
|
||||
@@ -167,7 +167,7 @@ async def test_homeassistant_restore_rejects_path_traversal(
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await coresys.homeassistant.restore(tar_file)
|
||||
|
||||
@@ -181,7 +181,7 @@ async def test_addon_restore_rejects_path_traversal(
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
@@ -203,7 +203,7 @@ async def test_addon_restore_rejects_symlink_escape(
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
tar_file = SecureTarFile(tar_path, gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await install_addon_ssh.restore(tar_file)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ from blockbuster import BlockBuster, BlockBusterFunction
|
||||
from dbus_fast import BusType
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
import pytest
|
||||
from securetar import SecureTarFile
|
||||
from securetar import SecureTarArchive
|
||||
|
||||
from supervisor import config as su_config
|
||||
from supervisor.addons.addon import Addon
|
||||
@@ -848,7 +848,7 @@ async def backups(
|
||||
for i in range(request.param if hasattr(request, "param") else 5):
|
||||
slug = f"sn{i + 1}"
|
||||
temp_tar = Path(tmp_path, f"{slug}.tar")
|
||||
with SecureTarFile(temp_tar, "w"):
|
||||
with SecureTarArchive(temp_tar, "w"):
|
||||
pass
|
||||
backup = Backup(coresys, temp_tar, slug, None)
|
||||
backup._data = { # pylint: disable=protected-access
|
||||
|
||||
BIN
tests/fixtures/backup_example_sec_v3.tar
vendored
Normal file
BIN
tests/fixtures/backup_example_sec_v3.tar
vendored
Normal file
Binary file not shown.
Reference in New Issue
Block a user