mirror of
https://github.com/home-assistant/core.git
synced 2026-05-08 17:49:37 +01:00
Add support for reading backups using securetar v3 (#163920)
This commit is contained in:
@@ -16,6 +16,7 @@ from typing import IO, Any, cast
|
||||
|
||||
import aiohttp
|
||||
from securetar import (
|
||||
InvalidPasswordError,
|
||||
SecureTarArchive,
|
||||
SecureTarError,
|
||||
SecureTarFile,
|
||||
@@ -165,7 +166,7 @@ def validate_password(path: Path, password: str | None) -> bool:
|
||||
):
|
||||
# If we can read the tar file, the password is correct
|
||||
return True
|
||||
except tarfile.ReadError, SecureTarReadError:
|
||||
except tarfile.ReadError, InvalidPasswordError, SecureTarReadError:
|
||||
LOGGER.debug("Invalid password")
|
||||
return False
|
||||
except Exception: # noqa: BLE001
|
||||
@@ -192,13 +193,14 @@ def validate_password_stream(
|
||||
for obj in input_archive.tar:
|
||||
if not obj.name.endswith((".tar", ".tgz", ".tar.gz")):
|
||||
continue
|
||||
with input_archive.extract_tar(obj) as decrypted:
|
||||
if decrypted.plaintext_size is None:
|
||||
raise UnsupportedSecureTarVersion
|
||||
try:
|
||||
try:
|
||||
with input_archive.extract_tar(obj) as decrypted:
|
||||
if decrypted.plaintext_size is None:
|
||||
raise UnsupportedSecureTarVersion
|
||||
decrypted.read(1) # Read a single byte to trigger the decryption
|
||||
except SecureTarReadError as err:
|
||||
raise IncorrectPassword from err
|
||||
except (InvalidPasswordError, SecureTarReadError) as err:
|
||||
raise IncorrectPassword from err
|
||||
else:
|
||||
return
|
||||
raise BackupEmpty
|
||||
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -188,7 +188,7 @@
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_can_decrypt_on_download[backup.local-c0cb53bd-hunter2]
|
||||
# name: test_can_decrypt_on_download[backup.local-backup_compressed_protected_v2-hunter2]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': None,
|
||||
@@ -196,7 +196,26 @@
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_can_decrypt_on_download[backup.local-c0cb53bd-wrong_password]
|
||||
# name: test_can_decrypt_on_download[backup.local-backup_compressed_protected_v2-wrong_password]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'password_incorrect',
|
||||
'message': 'Incorrect password',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_can_decrypt_on_download[backup.local-backup_compressed_protected_v3-hunter2]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': None,
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_can_decrypt_on_download[backup.local-backup_compressed_protected_v3-wrong_password]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'password_incorrect',
|
||||
|
||||
@@ -131,32 +131,75 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("backup", "password", "validation_result"),
|
||||
("backup", "password", "validation_result", "expected_messages"),
|
||||
[
|
||||
# Backup not protected, no password provided -> validation passes
|
||||
(Path("backup_v2_compressed.tar"), None, True),
|
||||
(Path("backup_v2_uncompressed.tar"), None, True),
|
||||
(Path("backup_compressed.tar"), None, True, []),
|
||||
(Path("backup_uncompressed.tar"), None, True, []),
|
||||
# Backup not protected, password provided -> validation fails
|
||||
(Path("backup_v2_compressed.tar"), "hunter2", False),
|
||||
(Path("backup_v2_uncompressed.tar"), "hunter2", False),
|
||||
(Path("backup_compressed.tar"), "hunter2", False, ["Invalid password"]),
|
||||
(Path("backup_uncompressed.tar"), "hunter2", False, ["Invalid password"]),
|
||||
# Backup protected, correct password provided -> validation passes
|
||||
(Path("backup_v2_compressed_protected.tar"), "hunter2", True),
|
||||
(Path("backup_v2_uncompressed_protected.tar"), "hunter2", True),
|
||||
(Path("backup_compressed_protected_v2.tar"), "hunter2", True, []),
|
||||
(Path("backup_uncompressed_protected_v2.tar"), "hunter2", True, []),
|
||||
(Path("backup_compressed_protected_v3.tar"), "hunter2", True, []),
|
||||
(Path("backup_uncompressed_protected_v3.tar"), "hunter2", True, []),
|
||||
# Backup protected, no password provided -> validation fails
|
||||
(Path("backup_v2_compressed_protected.tar"), None, False),
|
||||
(Path("backup_v2_uncompressed_protected.tar"), None, False),
|
||||
(Path("backup_compressed_protected_v2.tar"), None, False, ["Invalid password"]),
|
||||
(
|
||||
Path("backup_uncompressed_protected_v2.tar"),
|
||||
None,
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
(Path("backup_compressed_protected_v3.tar"), None, False, ["Invalid password"]),
|
||||
(
|
||||
Path("backup_uncompressed_protected_v3.tar"),
|
||||
None,
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
# Backup protected, wrong password provided -> validation fails
|
||||
(Path("backup_v2_compressed_protected.tar"), "wrong_password", False),
|
||||
(Path("backup_v2_uncompressed_protected.tar"), "wrong_password", False),
|
||||
(
|
||||
Path("backup_compressed_protected_v2.tar"),
|
||||
"wrong_password",
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
(
|
||||
Path("backup_uncompressed_protected_v2.tar"),
|
||||
"wrong_password",
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
(
|
||||
Path("backup_compressed_protected_v3.tar"),
|
||||
"wrong_password",
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
(
|
||||
Path("backup_uncompressed_protected_v3.tar"),
|
||||
"wrong_password",
|
||||
False,
|
||||
["Invalid password"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_validate_password(
|
||||
password: str | None, backup: Path, validation_result: bool
|
||||
password: str | None,
|
||||
backup: Path,
|
||||
validation_result: bool,
|
||||
expected_messages: list[str],
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test validating a password."""
|
||||
test_backups = get_fixture_path("test_backups", DOMAIN)
|
||||
|
||||
assert validate_password(test_backups / backup, password) == validation_result
|
||||
for message in expected_messages:
|
||||
assert message in caplog.text
|
||||
assert "Unexpected error validating password" not in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize("password", [None, "hunter2"])
|
||||
|
||||
@@ -4048,8 +4048,10 @@ async def test_subscribe_event(
|
||||
# Legacy backup, which can't be streamed
|
||||
("backup.local", "2bcb3113", "hunter2"),
|
||||
# New backup, which can be streamed, try with correct and wrong password
|
||||
("backup.local", "c0cb53bd", "hunter2"),
|
||||
("backup.local", "c0cb53bd", "wrong_password"),
|
||||
("backup.local", "backup_compressed_protected_v2", "hunter2"),
|
||||
("backup.local", "backup_compressed_protected_v2", "wrong_password"),
|
||||
("backup.local", "backup_compressed_protected_v3", "hunter2"),
|
||||
("backup.local", "backup_compressed_protected_v3", "wrong_password"),
|
||||
],
|
||||
)
|
||||
@pytest.mark.usefixtures("mock_backups")
|
||||
|
||||
Reference in New Issue
Block a user