1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-20 10:59:24 +00:00

Add backblaze b2 backup integration (#149627)

Co-authored-by: Hugo van Rijswijk <git@hugovr.nl>
Co-authored-by: ElCruncharino <ElCruncharino@users.noreply.github.com>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
ElCruncharino
2025-10-30 03:42:02 -04:00
committed by GitHub
parent 7de94f3632
commit dcb2087f4b
24 changed files with 3341 additions and 0 deletions

View File

@@ -107,6 +107,7 @@ homeassistant.components.automation.*
homeassistant.components.awair.*
homeassistant.components.axis.*
homeassistant.components.azure_storage.*
homeassistant.components.backblaze_b2.*
homeassistant.components.backup.*
homeassistant.components.baf.*
homeassistant.components.bang_olufsen.*

2
CODEOWNERS generated
View File

@@ -196,6 +196,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/azure_service_bus/ @hfurubotten
/homeassistant/components/azure_storage/ @zweckj
/tests/components/azure_storage/ @zweckj
/homeassistant/components/backblaze_b2/ @hugo-vrijswijk @ElCruncharino
/tests/components/backblaze_b2/ @hugo-vrijswijk @ElCruncharino
/homeassistant/components/backup/ @home-assistant/core
/tests/components/backup/ @home-assistant/core
/homeassistant/components/baf/ @bdraco @jfroy

View File

@@ -0,0 +1,116 @@
"""The Backblaze B2 integration."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from b2sdk.v2 import B2Api, Bucket, InMemoryAccountInfo, exception
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.event import async_track_time_interval
from .const import (
BACKBLAZE_REALM,
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from .repairs import (
async_check_for_repair_issues,
create_bucket_access_restricted_issue,
create_bucket_not_found_issue,
)
_LOGGER = logging.getLogger(__name__)
type BackblazeConfigEntry = ConfigEntry[Bucket]
async def async_setup_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Set up Backblaze B2 from a config entry."""
info = InMemoryAccountInfo()
b2_api = B2Api(info)
def _authorize_and_get_bucket_sync() -> Bucket:
"""Synchronously authorize the Backblaze B2 account and retrieve the bucket.
This function runs in the event loop's executor as b2sdk operations are blocking.
"""
b2_api.authorize_account(
BACKBLAZE_REALM,
entry.data[CONF_KEY_ID],
entry.data[CONF_APPLICATION_KEY],
)
return b2_api.get_bucket_by_name(entry.data[CONF_BUCKET])
try:
bucket = await hass.async_add_executor_job(_authorize_and_get_bucket_sync)
except exception.Unauthorized as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_credentials",
) from err
except exception.RestrictedBucket as err:
create_bucket_access_restricted_issue(hass, entry, err.bucket_name)
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="restricted_bucket",
translation_placeholders={
"restricted_bucket_name": err.bucket_name,
},
) from err
except exception.NonExistentBucket as err:
create_bucket_not_found_issue(hass, entry, entry.data[CONF_BUCKET])
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="invalid_bucket_name",
) from err
except exception.ConnectionReset as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
except exception.MissingAccountData as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_auth",
) from err
entry.runtime_data = bucket
def _async_notify_backup_listeners() -> None:
"""Notify any registered backup agent listeners."""
_LOGGER.debug("Notifying backup listeners for entry %s", entry.entry_id)
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()
entry.async_on_unload(entry.async_on_state_change(_async_notify_backup_listeners))
async def _periodic_issue_check(_now: Any) -> None:
"""Periodically check for repair issues."""
await async_check_for_repair_issues(hass, entry)
entry.async_on_unload(
async_track_time_interval(hass, _periodic_issue_check, timedelta(minutes=30))
)
hass.async_create_task(async_check_for_repair_issues(hass, entry))
return True
async def async_unload_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Unload a Backblaze B2 config entry.
Any resources directly managed by this entry that need explicit shutdown
would be handled here. In this case, the `async_on_state_change` listener
handles the notification logic on unload.
"""
return True

View File

@@ -0,0 +1,615 @@
"""Backup platform for the Backblaze B2 integration."""
import asyncio
from collections.abc import AsyncIterator, Callable, Coroutine
import functools
import json
import logging
import mimetypes
from time import time
from typing import Any
from b2sdk.v2 import FileVersion
from b2sdk.v2.exception import B2Error
from homeassistant.components.backup import (
AgentBackup,
BackupAgent,
BackupAgentError,
BackupNotFound,
suggested_filename,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.util.async_iterator import AsyncIteratorReader
from . import BackblazeConfigEntry
from .const import (
CONF_PREFIX,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
METADATA_FILE_SUFFIX,
METADATA_VERSION,
)
_LOGGER = logging.getLogger(__name__)
# Cache TTL for backup list (in seconds)
CACHE_TTL = 300
def suggested_filenames(backup: AgentBackup) -> tuple[str, str]:
"""Return the suggested filenames for the backup and metadata files."""
base_name = suggested_filename(backup).rsplit(".", 1)[0]
return f"{base_name}.tar", f"{base_name}.metadata.json"
def _parse_metadata(raw_content: str) -> dict[str, Any]:
"""Parse metadata content from JSON."""
try:
data = json.loads(raw_content)
except json.JSONDecodeError as err:
raise ValueError(f"Invalid JSON format: {err}") from err
else:
if not isinstance(data, dict):
raise TypeError("JSON content is not a dictionary")
return data
def _find_backup_file_for_metadata(
metadata_filename: str, all_files: dict[str, FileVersion], prefix: str
) -> FileVersion | None:
"""Find corresponding backup file for metadata file."""
base_name = metadata_filename[len(prefix) :].removesuffix(METADATA_FILE_SUFFIX)
return next(
(
file
for name, file in all_files.items()
if name.startswith(prefix + base_name)
and name.endswith(".tar")
and name != metadata_filename
),
None,
)
def _create_backup_from_metadata(
metadata_content: dict[str, Any], backup_file: FileVersion
) -> AgentBackup:
"""Construct an AgentBackup from parsed metadata content and the associated backup file."""
metadata = metadata_content["backup_metadata"]
metadata["size"] = backup_file.size
return AgentBackup.from_dict(metadata)
def handle_b2_errors[T](
func: Callable[..., Coroutine[Any, Any, T]],
) -> Callable[..., Coroutine[Any, Any, T]]:
"""Handle B2Errors by converting them to BackupAgentError."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
"""Catch B2Error and raise BackupAgentError."""
try:
return await func(*args, **kwargs)
except B2Error as err:
error_msg = f"Failed during {func.__name__}"
raise BackupAgentError(error_msg) from err
return wrapper
async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents for all configured Backblaze B2 entries."""
entries: list[BackblazeConfigEntry] = hass.config_entries.async_loaded_entries(
DOMAIN
)
return [BackblazeBackupAgent(hass, entry) for entry in entries]
@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when backup agents are added or removed.
:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
hass.data.pop(DATA_BACKUP_AGENT_LISTENERS, None)
return remove_listener
class BackblazeBackupAgent(BackupAgent):
"""Backup agent for Backblaze B2 cloud storage."""
domain = DOMAIN
def __init__(self, hass: HomeAssistant, entry: BackblazeConfigEntry) -> None:
"""Initialize the Backblaze B2 agent."""
super().__init__()
self._hass = hass
self._bucket = entry.runtime_data
self._prefix = entry.data[CONF_PREFIX]
self.name = entry.title
self.unique_id = entry.entry_id
self._all_files_cache: dict[str, FileVersion] = {}
self._all_files_cache_expiration: float = 0.0
self._backup_list_cache: dict[str, AgentBackup] = {}
self._backup_list_cache_expiration: float = 0.0
self._all_files_cache_lock = asyncio.Lock()
self._backup_list_cache_lock = asyncio.Lock()
def _is_cache_valid(self, expiration_time: float) -> bool:
"""Check if cache is still valid based on expiration time."""
return time() <= expiration_time
async def _cleanup_failed_upload(self, filename: str) -> None:
"""Clean up a partially uploaded file after upload failure."""
_LOGGER.warning(
"Attempting to delete partially uploaded main backup file %s "
"due to metadata upload failure",
filename,
)
try:
uploaded_main_file_info = await self._hass.async_add_executor_job(
self._bucket.get_file_info_by_name, filename
)
await self._hass.async_add_executor_job(uploaded_main_file_info.delete)
except B2Error:
_LOGGER.debug(
"Failed to clean up partially uploaded main backup file %s. "
"Manual intervention may be required to delete it from Backblaze B2",
filename,
exc_info=True,
)
else:
_LOGGER.debug(
"Successfully deleted partially uploaded main backup file %s", filename
)
async def _get_file_for_download(self, backup_id: str) -> FileVersion:
"""Get backup file for download, raising if not found."""
file, _ = await self._find_file_and_metadata_version_by_id(backup_id)
if not file:
raise BackupNotFound(f"Backup {backup_id} not found")
return file
@handle_b2_errors
async def async_download_backup(
self, backup_id: str, **kwargs: Any
) -> AsyncIterator[bytes]:
"""Download a backup from Backblaze B2."""
file = await self._get_file_for_download(backup_id)
_LOGGER.debug("Downloading %s", file.file_name)
downloaded_file = await self._hass.async_add_executor_job(file.download)
response = downloaded_file.response
async def stream_response() -> AsyncIterator[bytes]:
"""Stream the response into an AsyncIterator."""
try:
iterator = response.iter_content(chunk_size=1024 * 1024)
while True:
chunk = await self._hass.async_add_executor_job(
next, iterator, None
)
if chunk is None:
break
yield chunk
finally:
_LOGGER.debug("Finished streaming download for %s", file.file_name)
return stream_response()
@handle_b2_errors
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup to Backblaze B2.
This involves uploading the main backup archive and a separate metadata JSON file.
"""
tar_filename, metadata_filename = suggested_filenames(backup)
prefixed_tar_filename = self._prefix + tar_filename
prefixed_metadata_filename = self._prefix + metadata_filename
metadata_content_bytes = json.dumps(
{
"metadata_version": METADATA_VERSION,
"backup_id": backup.backup_id,
"backup_metadata": backup.as_dict(),
}
).encode("utf-8")
_LOGGER.debug(
"Uploading backup: %s, and metadata: %s",
prefixed_tar_filename,
prefixed_metadata_filename,
)
upload_successful = False
try:
await self._upload_backup_file(prefixed_tar_filename, open_stream, {})
_LOGGER.debug(
"Main backup file upload finished for %s", prefixed_tar_filename
)
_LOGGER.debug("Uploading metadata file: %s", prefixed_metadata_filename)
await self._upload_metadata_file(
metadata_content_bytes, prefixed_metadata_filename
)
_LOGGER.debug(
"Metadata file upload finished for %s", prefixed_metadata_filename
)
upload_successful = True
finally:
if upload_successful:
_LOGGER.debug("Backup upload complete: %s", prefixed_tar_filename)
self._invalidate_caches(
backup.backup_id, prefixed_tar_filename, prefixed_metadata_filename
)
else:
await self._cleanup_failed_upload(prefixed_tar_filename)
def _upload_metadata_file_sync(
self, metadata_content: bytes, filename: str
) -> None:
"""Synchronously upload metadata file to B2."""
self._bucket.upload_bytes(
metadata_content,
filename,
content_type="application/json",
file_info={"metadata_only": "true"},
)
async def _upload_metadata_file(
self, metadata_content: bytes, filename: str
) -> None:
"""Upload metadata file to B2."""
await self._hass.async_add_executor_job(
self._upload_metadata_file_sync,
metadata_content,
filename,
)
def _upload_unbound_stream_sync(
self,
reader: AsyncIteratorReader,
filename: str,
content_type: str,
file_info: dict[str, Any],
) -> FileVersion:
"""Synchronously upload unbound stream to B2."""
return self._bucket.upload_unbound_stream(
reader,
filename,
content_type=content_type,
file_info=file_info,
)
def _download_and_parse_metadata_sync(
self, metadata_file_version: FileVersion
) -> dict[str, Any]:
"""Synchronously download and parse metadata file."""
return _parse_metadata(
metadata_file_version.download().response.content.decode("utf-8")
)
async def _upload_backup_file(
self,
filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
file_info: dict[str, Any],
) -> None:
"""Upload backup file to B2 using streaming."""
_LOGGER.debug("Starting streaming upload for %s", filename)
stream = await open_stream()
reader = AsyncIteratorReader(self._hass.loop, stream)
_LOGGER.debug("Uploading backup file %s with streaming", filename)
try:
content_type, _ = mimetypes.guess_type(filename)
file_version = await self._hass.async_add_executor_job(
self._upload_unbound_stream_sync,
reader,
filename,
content_type or "application/x-tar",
file_info,
)
finally:
reader.close()
_LOGGER.debug("Successfully uploaded %s (ID: %s)", filename, file_version.id_)
@handle_b2_errors
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup and its associated metadata file from Backblaze B2."""
file, metadata_file = await self._find_file_and_metadata_version_by_id(
backup_id
)
if not file:
raise BackupNotFound(f"Backup {backup_id} not found")
# Invariant: when file is not None, metadata_file is also not None
assert metadata_file is not None
_LOGGER.debug(
"Deleting backup file: %s and metadata file: %s",
file.file_name,
metadata_file.file_name,
)
await self._hass.async_add_executor_job(file.delete)
await self._hass.async_add_executor_job(metadata_file.delete)
self._invalidate_caches(
backup_id,
file.file_name,
metadata_file.file_name,
remove_files=True,
)
@handle_b2_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List all backups by finding their associated metadata files in Backblaze B2."""
async with self._backup_list_cache_lock:
if self._backup_list_cache and self._is_cache_valid(
self._backup_list_cache_expiration
):
_LOGGER.debug("Returning backups from cache")
return list(self._backup_list_cache.values())
_LOGGER.debug(
"Cache expired or empty, fetching all files from B2 to build backup list"
)
all_files_in_prefix = await self._get_all_files_in_prefix()
_LOGGER.debug(
"Files found in prefix '%s': %s",
self._prefix,
list(all_files_in_prefix.keys()),
)
# Process metadata files sequentially to avoid exhausting executor pool
backups = {}
for file_name, file_version in all_files_in_prefix.items():
if file_name.endswith(METADATA_FILE_SUFFIX):
backup = await self._hass.async_add_executor_job(
self._process_metadata_file_sync,
file_name,
file_version,
all_files_in_prefix,
)
if backup:
backups[backup.backup_id] = backup
self._backup_list_cache = backups
self._backup_list_cache_expiration = time() + CACHE_TTL
return list(backups.values())
@handle_b2_errors
async def async_get_backup(self, backup_id: str, **kwargs: Any) -> AgentBackup:
"""Get a specific backup by its ID from Backblaze B2."""
if self._backup_list_cache and self._is_cache_valid(
self._backup_list_cache_expiration
):
if backup := self._backup_list_cache.get(backup_id):
_LOGGER.debug("Returning backup %s from cache", backup_id)
return backup
file, metadata_file_version = await self._find_file_and_metadata_version_by_id(
backup_id
)
if not file or not metadata_file_version:
raise BackupNotFound(f"Backup {backup_id} not found")
metadata_content = await self._hass.async_add_executor_job(
self._download_and_parse_metadata_sync,
metadata_file_version,
)
_LOGGER.debug(
"Successfully retrieved metadata for backup ID %s from file %s",
backup_id,
metadata_file_version.file_name,
)
backup = _create_backup_from_metadata(metadata_content, file)
if self._is_cache_valid(self._backup_list_cache_expiration):
self._backup_list_cache[backup.backup_id] = backup
return backup
async def _find_file_and_metadata_version_by_id(
self, backup_id: str
) -> tuple[FileVersion | None, FileVersion | None]:
"""Find the main backup file and its associated metadata file version by backup ID."""
all_files_in_prefix = await self._get_all_files_in_prefix()
# Process metadata files sequentially to avoid exhausting executor pool
for file_name, file_version in all_files_in_prefix.items():
if file_name.endswith(METADATA_FILE_SUFFIX):
(
result_backup_file,
result_metadata_file_version,
) = await self._hass.async_add_executor_job(
self._process_metadata_file_for_id_sync,
file_name,
file_version,
backup_id,
all_files_in_prefix,
)
if result_backup_file and result_metadata_file_version:
return result_backup_file, result_metadata_file_version
_LOGGER.debug("Backup %s not found", backup_id)
return None, None
def _process_metadata_file_for_id_sync(
self,
file_name: str,
file_version: FileVersion,
target_backup_id: str,
all_files_in_prefix: dict[str, FileVersion],
) -> tuple[FileVersion | None, FileVersion | None]:
"""Synchronously process a single metadata file for a specific backup ID.
Called within a thread pool executor.
"""
try:
download_response = file_version.download().response
except B2Error as err:
_LOGGER.warning(
"Failed to download metadata file %s during ID search: %s",
file_name,
err,
)
return None, None
try:
metadata_content = _parse_metadata(
download_response.content.decode("utf-8")
)
except ValueError:
return None, None
if metadata_content["backup_id"] != target_backup_id:
_LOGGER.debug(
"Metadata file %s does not match target backup ID %s",
file_name,
target_backup_id,
)
return None, None
found_backup_file = _find_backup_file_for_metadata(
file_name, all_files_in_prefix, self._prefix
)
if not found_backup_file:
_LOGGER.warning(
"Found metadata file %s for backup ID %s, but no corresponding backup file",
file_name,
target_backup_id,
)
return None, None
_LOGGER.debug(
"Found backup file %s and metadata file %s for ID %s",
found_backup_file.file_name,
file_name,
target_backup_id,
)
return found_backup_file, file_version
async def _get_all_files_in_prefix(self) -> dict[str, FileVersion]:
"""Get all file versions in the configured prefix from Backblaze B2.
Uses a cache to minimize API calls.
This fetches a flat list of all files, including main backups and metadata files.
"""
async with self._all_files_cache_lock:
if self._is_cache_valid(self._all_files_cache_expiration):
_LOGGER.debug("Returning all files from cache")
return self._all_files_cache
_LOGGER.debug("Cache for all files expired or empty, fetching from B2")
all_files_in_prefix = await self._hass.async_add_executor_job(
self._fetch_all_files_in_prefix
)
self._all_files_cache = all_files_in_prefix
self._all_files_cache_expiration = time() + CACHE_TTL
return all_files_in_prefix
def _fetch_all_files_in_prefix(self) -> dict[str, FileVersion]:
"""Fetch all files in the configured prefix from B2."""
all_files: dict[str, FileVersion] = {}
for file, _ in self._bucket.ls(self._prefix):
all_files[file.file_name] = file
return all_files
def _process_metadata_file_sync(
self,
file_name: str,
file_version: FileVersion,
all_files_in_prefix: dict[str, FileVersion],
) -> AgentBackup | None:
"""Synchronously process a single metadata file and return an AgentBackup if valid."""
try:
download_response = file_version.download().response
except B2Error as err:
_LOGGER.warning("Failed to download metadata file %s: %s", file_name, err)
return None
try:
metadata_content = _parse_metadata(
download_response.content.decode("utf-8")
)
except ValueError:
return None
found_backup_file = _find_backup_file_for_metadata(
file_name, all_files_in_prefix, self._prefix
)
if not found_backup_file:
_LOGGER.warning(
"Found metadata file %s but no corresponding backup file",
file_name,
)
return None
_LOGGER.debug(
"Successfully processed metadata file %s for backup ID %s",
file_name,
metadata_content["backup_id"],
)
return _create_backup_from_metadata(metadata_content, found_backup_file)
def _invalidate_caches(
self,
backup_id: str,
tar_filename: str,
metadata_filename: str | None,
*,
remove_files: bool = False,
) -> None:
"""Invalidate caches after upload/deletion operations.
Args:
backup_id: The backup ID to remove from backup cache
tar_filename: The tar filename to remove from files cache
metadata_filename: The metadata filename to remove from files cache
remove_files: If True, remove specific files from cache; if False, expire entire cache
"""
if remove_files:
if self._is_cache_valid(self._all_files_cache_expiration):
self._all_files_cache.pop(tar_filename, None)
if metadata_filename:
self._all_files_cache.pop(metadata_filename, None)
if self._is_cache_valid(self._backup_list_cache_expiration):
self._backup_list_cache.pop(backup_id, None)
else:
# For uploads, we can't easily add new FileVersion objects without API calls,
# so we expire the entire cache for simplicity
self._all_files_cache_expiration = 0.0
self._backup_list_cache_expiration = 0.0

View File

@@ -0,0 +1,288 @@
"""Config flow for the Backblaze B2 integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from b2sdk.v2 import B2Api, InMemoryAccountInfo, exception
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import (
BACKBLAZE_REALM,
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
CONF_PREFIX,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
# Constants
REQUIRED_CAPABILITIES = {"writeFiles", "listFiles", "deleteFiles", "readFiles"}
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Required(CONF_BUCKET): cv.string,
vol.Optional(CONF_PREFIX, default=""): cv.string,
}
)
class BackblazeConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Backblaze B2."""
VERSION = 1
reauth_entry: ConfigEntry[Any] | None
def _abort_if_duplicate_credentials(self, user_input: dict[str, Any]) -> None:
"""Abort if credentials already exist in another entry."""
self._async_abort_entries_match(
{
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
}
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_create_entry(
title=user_input[CONF_BUCKET], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)
async def _async_validate_backblaze_connection(
self, user_input: dict[str, Any]
) -> tuple[dict[str, str], dict[str, str]]:
"""Validate Backblaze B2 credentials, bucket, capabilities, and prefix.
Returns a tuple of (errors_dict, placeholders_dict).
"""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
info = InMemoryAccountInfo()
b2_api = B2Api(info)
def _authorize_and_get_bucket_sync() -> None:
"""Synchronously authorize the account and get the bucket by name.
This function is run in the executor because b2sdk operations are blocking.
"""
b2_api.authorize_account(
BACKBLAZE_REALM, # Use the defined realm constant
user_input[CONF_KEY_ID],
user_input[CONF_APPLICATION_KEY],
)
b2_api.get_bucket_by_name(user_input[CONF_BUCKET])
try:
await self.hass.async_add_executor_job(_authorize_and_get_bucket_sync)
allowed = b2_api.account_info.get_allowed()
# Check if allowed info is available
if allowed is None or not allowed.get("capabilities"):
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(REQUIRED_CAPABILITIES)
)
else:
# Check if all required capabilities are present
current_caps = set(allowed["capabilities"])
if not REQUIRED_CAPABILITIES.issubset(current_caps):
missing_caps = REQUIRED_CAPABILITIES - current_caps
_LOGGER.warning(
"Missing required Backblaze B2 capabilities for Key ID '%s': %s",
user_input[CONF_KEY_ID],
", ".join(sorted(missing_caps)),
)
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(missing_caps)
)
else:
# Only check prefix if capabilities are valid
configured_prefix: str = user_input[CONF_PREFIX]
allowed_prefix = allowed.get("namePrefix") or ""
# Ensure configured prefix starts with Backblaze B2's allowed prefix
if allowed_prefix and not configured_prefix.startswith(
allowed_prefix
):
errors[CONF_PREFIX] = "invalid_prefix"
placeholders["allowed_prefix"] = allowed_prefix
except exception.Unauthorized:
_LOGGER.debug(
"Backblaze B2 authentication failed for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except exception.RestrictedBucket as err:
_LOGGER.debug(
"Access to Backblaze B2 bucket '%s' is restricted: %s",
user_input[CONF_BUCKET],
err,
)
placeholders["restricted_bucket_name"] = err.bucket_name
errors[CONF_BUCKET] = "restricted_bucket"
except exception.NonExistentBucket:
_LOGGER.debug(
"Backblaze B2 bucket '%s' does not exist", user_input[CONF_BUCKET]
)
errors[CONF_BUCKET] = "invalid_bucket_name"
except exception.ConnectionReset:
_LOGGER.error("Failed to connect to Backblaze B2. Connection reset")
errors["base"] = "cannot_connect"
except exception.MissingAccountData:
# This generally indicates an issue with how InMemoryAccountInfo is used
_LOGGER.error(
"Missing account data during Backblaze B2 authorization for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except Exception:
_LOGGER.exception(
"An unexpected error occurred during Backblaze B2 configuration for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "unknown"
return errors, placeholders
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle reauthentication flow."""
self.reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
assert self.reauth_entry is not None
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication."""
assert self.reauth_entry is not None
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
validation_input = {
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
CONF_BUCKET: self.reauth_entry.data[CONF_BUCKET],
CONF_PREFIX: self.reauth_entry.data[CONF_PREFIX],
}
errors, placeholders = await self._async_validate_backblaze_connection(
validation_input
)
if not errors:
return self.async_update_reload_and_abort(
self.reauth_entry,
data_updates={
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
},
)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
}
),
errors=errors,
description_placeholders={
"brand_name": "Backblaze B2",
"bucket": self.reauth_entry.data[CONF_BUCKET],
**placeholders,
},
)
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfiguration flow."""
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry is not None
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_update_reload_and_abort(
entry,
data_updates=user_input,
)
else:
errors = {}
placeholders = {}
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input or entry.data
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)

View File

@@ -0,0 +1,22 @@
"""Constants for the Backblaze B2 integration."""
from collections.abc import Callable
from typing import Final
from homeassistant.util.hass_dict import HassKey
DOMAIN: Final = "backblaze_b2"
CONF_KEY_ID = "key_id"
CONF_APPLICATION_KEY = "application_key"
CONF_BUCKET = "bucket"
CONF_PREFIX = "prefix"
DATA_BACKUP_AGENT_LISTENERS: HassKey[list[Callable[[], None]]] = HassKey(
f"{DOMAIN}.backup_agent_listeners"
)
METADATA_FILE_SUFFIX = ".metadata.json"
METADATA_VERSION = "1"
BACKBLAZE_REALM = "production"

View File

@@ -0,0 +1,56 @@
"""Diagnostics support for Backblaze B2."""
from __future__ import annotations
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.core import HomeAssistant
from . import BackblazeConfigEntry
from .const import CONF_APPLICATION_KEY, CONF_KEY_ID
TO_REDACT_ENTRY_DATA = {CONF_APPLICATION_KEY, CONF_KEY_ID}
TO_REDACT_ACCOUNT_DATA_ALLOWED = {"bucketId", "bucketName", "namePrefix"}
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: BackblazeConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
bucket = entry.runtime_data
try:
bucket_info = {
"name": bucket.name,
"id": bucket.id_,
"type": bucket.type_,
"cors_rules": bucket.cors_rules,
"lifecycle_rules": bucket.lifecycle_rules,
"revision": bucket.revision,
}
account_info = bucket.api.account_info
account_data: dict[str, Any] = {
"account_id": account_info.get_account_id(),
"api_url": account_info.get_api_url(),
"download_url": account_info.get_download_url(),
"minimum_part_size": account_info.get_minimum_part_size(),
"allowed": account_info.get_allowed(),
}
if isinstance(account_data["allowed"], dict):
account_data["allowed"] = async_redact_data(
account_data["allowed"], TO_REDACT_ACCOUNT_DATA_ALLOWED
)
except (AttributeError, TypeError, ValueError, KeyError):
bucket_info = {"name": "unknown", "id": "unknown"}
account_data = {"error": "Failed to retrieve detailed account information"}
return {
"entry_data": async_redact_data(entry.data, TO_REDACT_ENTRY_DATA),
"entry_options": entry.options,
"bucket_info": bucket_info,
"account_info": account_data,
}

View File

@@ -0,0 +1,12 @@
{
"domain": "backblaze_b2",
"name": "Backblaze B2",
"codeowners": ["@hugo-vrijswijk", "@ElCruncharino"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/backblaze_b2",
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["b2sdk"],
"quality_scale": "bronze",
"requirements": ["b2sdk==2.8.1"]
}

View File

@@ -0,0 +1,124 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling:
status: exempt
comment: Integration does not poll.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: This integration does not have any custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Entities of this integration do not explicitly subscribe to events.
entity-unique-id:
status: exempt
comment: |
This integration does not have entities.
has-entity-name:
status: exempt
comment: |
This integration does not have entities.
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register custom actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: This integration does not have an options flow.
docs-installation-parameters: done
entity-unavailable:
status: exempt
comment: This integration does not have entities.
integration-owner: done
log-when-unavailable:
status: exempt
comment: This integration does not have entities.
parallel-updates:
status: exempt
comment: This integration does not poll.
reauthentication-flow: done
test-coverage: done
# Gold
devices:
status: exempt
comment: This integration does not have entities.
diagnostics: done
discovery-update-info:
status: exempt
comment: Backblaze B2 is a cloud service that is not discovered on the network.
discovery:
status: exempt
comment: Backblaze B2 is a cloud service that is not discovered on the network.
docs-data-update:
status: exempt
comment: This integration does not poll.
docs-examples:
status: exempt
comment: The integration extends core functionality and does not require examples.
docs-known-limitations: done
docs-supported-devices:
status: exempt
comment: This integration does not support physical devices.
docs-supported-functions:
status: exempt
comment: This integration does not have entities.
docs-troubleshooting: todo
docs-use-cases: done
dynamic-devices:
status: exempt
comment: This integration does not have devices.
entity-category:
status: exempt
comment: This integration does not have entities.
entity-device-class:
status: exempt
comment: This integration does not have entities.
entity-disabled-by-default:
status: exempt
comment: This integration does not have entities.
entity-translations:
status: exempt
comment: This integration does not have entities.
exception-translations: done
icon-translations:
status: exempt
comment: This integration does not use icons.
reconfiguration-flow: done
repair-issues: done
stale-devices:
status: exempt
comment: This integration does not have devices.
# Platinum
async-dependency:
status: exempt
comment: |
The b2sdk library is synchronous by design. All sync operations are properly
wrapped with async_add_executor_job to prevent blocking the event loop.
inject-websession:
status: exempt
comment: |
The b2sdk library does not support custom HTTP session injection.
It manages HTTP connections internally through its own session management.
strict-typing:
status: exempt
comment: |
The b2sdk dependency does not include a py.typed file and is not PEP 561 compliant.
This is outside the integration's control as it's a third-party library requirement.

View File

@@ -0,0 +1,93 @@
"""Repair issues for the Backblaze B2 integration."""
from __future__ import annotations
import logging
from b2sdk.v2.exception import (
B2Error,
NonExistentBucket,
RestrictedBucket,
Unauthorized,
)
from homeassistant.components.repairs import ConfirmRepairFlow
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from .const import CONF_BUCKET, DOMAIN
_LOGGER = logging.getLogger(__name__)
ISSUE_BUCKET_ACCESS_RESTRICTED = "bucket_access_restricted"
ISSUE_BUCKET_NOT_FOUND = "bucket_not_found"
def _create_issue(
hass: HomeAssistant,
entry: ConfigEntry,
issue_type: str,
bucket_name: str,
) -> None:
"""Create a repair issue with standard parameters."""
ir.async_create_issue(
hass,
DOMAIN,
f"{issue_type}_{entry.entry_id}",
is_fixable=False,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.ERROR,
translation_key=issue_type,
translation_placeholders={
"brand_name": "Backblaze B2",
"title": entry.title,
"bucket_name": bucket_name,
"entry_id": entry.entry_id,
},
)
def create_bucket_access_restricted_issue(
hass: HomeAssistant, entry: ConfigEntry, bucket_name: str
) -> None:
"""Create a repair issue for restricted bucket access."""
_create_issue(hass, entry, ISSUE_BUCKET_ACCESS_RESTRICTED, bucket_name)
def create_bucket_not_found_issue(
hass: HomeAssistant, entry: ConfigEntry, bucket_name: str
) -> None:
"""Create a repair issue for non-existent bucket."""
_create_issue(hass, entry, ISSUE_BUCKET_NOT_FOUND, bucket_name)
async def async_check_for_repair_issues(
hass: HomeAssistant, entry: ConfigEntry
) -> None:
"""Check for common issues that require user action."""
bucket = entry.runtime_data
restricted_issue_id = f"{ISSUE_BUCKET_ACCESS_RESTRICTED}_{entry.entry_id}"
not_found_issue_id = f"{ISSUE_BUCKET_NOT_FOUND}_{entry.entry_id}"
try:
await hass.async_add_executor_job(bucket.api.account_info.get_allowed)
ir.async_delete_issue(hass, DOMAIN, restricted_issue_id)
ir.async_delete_issue(hass, DOMAIN, not_found_issue_id)
except Unauthorized:
entry.async_start_reauth(hass)
except RestrictedBucket as err:
_create_issue(hass, entry, ISSUE_BUCKET_ACCESS_RESTRICTED, err.bucket_name)
except NonExistentBucket:
_create_issue(hass, entry, ISSUE_BUCKET_NOT_FOUND, entry.data[CONF_BUCKET])
except B2Error as err:
_LOGGER.debug("B2 connectivity test failed: %s", err)
async def async_create_fix_flow(
hass: HomeAssistant,
issue_id: str,
data: dict[str, str | int | float | None] | None,
) -> ConfirmRepairFlow:
"""Create a fix flow for Backblaze B2 issues."""
return ConfirmRepairFlow()

View File

@@ -0,0 +1,92 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_bucket_name": "[%key:component::backblaze_b2::exceptions::invalid_bucket_name::message%]",
"invalid_capability": "[%key:component::backblaze_b2::exceptions::invalid_capability::message%]",
"invalid_credentials": "[%key:component::backblaze_b2::exceptions::invalid_credentials::message%]",
"invalid_prefix": "[%key:component::backblaze_b2::exceptions::invalid_prefix::message%]",
"restricted_bucket": "[%key:component::backblaze_b2::exceptions::restricted_bucket::message%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"reauth_confirm": {
"data": {
"application_key": "Application key",
"key_id": "Key ID"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"key_id": "Key ID to connect to {brand_name}"
},
"description": "Update your {brand_name} credentials for bucket {bucket}.",
"title": "Reauthenticate {brand_name}"
},
"reconfigure": {
"data": {
"application_key": "Application key",
"bucket": "Bucket name",
"key_id": "Key ID",
"prefix": "Folder prefix (optional)"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"bucket": "Bucket must already exist and be writable by the provided credentials.",
"key_id": "Key ID to connect to {brand_name}",
"prefix": "Directory path to store backup files in. Leave empty to store in the root."
},
"title": "Reconfigure {brand_name}"
},
"user": {
"data": {
"application_key": "Application key",
"bucket": "Bucket name",
"key_id": "Key ID",
"prefix": "Folder prefix (optional)"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"bucket": "Bucket must already exist and be writable by the provided credentials.",
"key_id": "Key ID to connect to {brand_name}",
"prefix": "Directory path to store backup files in. Leave empty to store in the root."
},
"title": "Add {brand_name} backup"
}
}
},
"exceptions": {
"cannot_connect": {
"message": "Cannot connect to endpoint"
},
"invalid_bucket_name": {
"message": "Bucket does not exist or is not writable by the provided credentials."
},
"invalid_capability": {
"message": "Application key does not have the required read/write capabilities."
},
"invalid_credentials": {
"message": "Bucket cannot be accessed using provided of key ID and application key."
},
"invalid_prefix": {
"message": "Prefix is not allowed for provided key. Must start with {allowed_prefix}."
},
"restricted_bucket": {
"message": "Application key is restricted to bucket {restricted_bucket_name}."
}
},
"issues": {
"bucket_access_restricted": {
"description": "Access to your {brand_name} bucket {bucket_name} is restricted for the current credentials. This means your application key may only have access to specific buckets, but not this one. To fix this issue:\n\n1. Log in to your {brand_name} account\n2. Check your application key restrictions\n3. Either use a different bucket that your key can access, or create a new application key with access to {bucket_name}\n4. Go to Settings > Devices & Services > {brand_name} and reconfigure the integration settings\n\nOnce you update the integration settings, this issue will be automatically resolved.",
"title": "{brand_name} bucket access restricted"
},
"bucket_not_found": {
"description": "The {brand_name} bucket {bucket_name} cannot be found or accessed. This could mean:\n\n1. The bucket was deleted\n2. The bucket name was changed\n3. Your credentials no longer have access to this bucket\n\nTo fix this issue:\n\n1. Log in to your {brand_name} account\n2. Verify the bucket still exists and check its name\n3. Ensure your application key has access to this bucket\n4. Go to Settings > Devices & Services > {brand_name} and reconfigure the integration settings\n\nOnce you update the integration settings, this issue will be automatically resolved.",
"title": "{brand_name} bucket not found"
}
}
}

View File

@@ -86,6 +86,7 @@ FLOWS = {
"azure_devops",
"azure_event_hub",
"azure_storage",
"backblaze_b2",
"baf",
"balboa",
"bang_olufsen",

View File

@@ -635,6 +635,12 @@
"config_flow": true,
"iot_class": "local_push"
},
"backblaze_b2": {
"name": "Backblaze B2",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_push"
},
"backup": {
"name": "Backup",
"integration_type": "service",

10
mypy.ini generated
View File

@@ -825,6 +825,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.backblaze_b2.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.backup.*]
check_untyped_defs = true
disallow_incomplete_defs = true

3
requirements_all.txt generated
View File

@@ -607,6 +607,9 @@ azure-servicebus==7.10.0
# homeassistant.components.azure_storage
azure-storage-blob==12.24.0
# homeassistant.components.backblaze_b2
b2sdk==2.8.1
# homeassistant.components.holiday
babel==2.15.0

View File

@@ -553,6 +553,9 @@ azure-kusto-ingest==4.5.1
# homeassistant.components.azure_storage
azure-storage-blob==12.24.0
# homeassistant.components.backblaze_b2
b2sdk==2.8.1
# homeassistant.components.holiday
babel==2.15.0

View File

@@ -0,0 +1,14 @@
"""Tests for the Backblaze B2 integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Set up the backblaze B2 integration for testing."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -0,0 +1,405 @@
"""Common fixtures for the Backblaze B2 tests."""
from collections.abc import Generator
import hashlib
import io
import json
import time
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
from b2sdk._internal.raw_simulator import BucketSimulator
from b2sdk.v2 import FileVersion, RawSimulator
import pytest
from homeassistant.components.backblaze_b2.const import (
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
DOMAIN,
)
from homeassistant.components.backup import AgentBackup
from .const import BACKUP_METADATA, TEST_BACKUP, USER_INPUT
from tests.common import MockConfigEntry
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.backblaze_b2.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry
@pytest.fixture(autouse=True)
def b2_fixture():
"""Create account and application keys."""
sim = RawSimulator()
allowed = {
"capabilities": [
"writeFiles",
"listFiles",
"deleteFiles",
"readFiles",
]
}
account_info = AccountInfo(allowed)
with (
patch("b2sdk.v2.B2Api", return_value=sim) as mock_client,
patch("homeassistant.components.backblaze_b2.B2Api", return_value=sim),
patch.object(
RawSimulator,
"get_bucket_by_name",
RawSimulator._get_bucket_by_name,
create=True,
),
patch.object(RawSimulator, "account_info", account_info, create=True),
):
sim: RawSimulator = mock_client.return_value
account_id, application_key = sim.create_account()
auth = sim.authorize_account("production", account_id, application_key)
auth_token: str = auth["authorizationToken"]
api_url: str = auth["apiInfo"]["storageApi"]["apiUrl"]
key = sim.create_key(
api_url=api_url,
account_auth_token=auth_token,
account_id=account_id,
key_name="testkey",
capabilities=[
"writeFiles",
"listFiles",
"deleteFiles",
"readFiles",
],
valid_duration_seconds=None,
bucket_id=None,
name_prefix=None,
)
application_key_id: str = key["applicationKeyId"]
application_key: str = key["applicationKey"]
bucket = sim.create_bucket(
api_url=api_url,
account_id=account_id,
account_auth_token=auth_token,
bucket_name=USER_INPUT[CONF_BUCKET],
bucket_type="allPrivate",
)
upload_url = sim.get_upload_url(api_url, auth_token, bucket["bucketId"])
test_backup_data = b"This is the actual backup data for the tar file."
stream_backup = io.BytesIO(test_backup_data)
stream_backup.seek(0)
backup_filename = f"{TEST_BACKUP.backup_id}.tar"
sha1_backup = hashlib.sha1(test_backup_data).hexdigest()
file_backup_upload_result = sim.upload_file(
upload_url["uploadUrl"],
upload_url["authorizationToken"],
f"testprefix/{backup_filename}",
len(test_backup_data),
"application/octet-stream",
sha1_backup,
{"backup_metadata": json.dumps(BACKUP_METADATA)},
stream_backup,
)
metadata_json_content_bytes = json.dumps(BACKUP_METADATA).encode("utf-8")
stream_metadata = io.BytesIO(metadata_json_content_bytes)
stream_metadata.seek(0)
metadata_filename = f"{TEST_BACKUP.backup_id}.metadata.json"
sha1_metadata = hashlib.sha1(metadata_json_content_bytes).hexdigest()
file_metadata_upload_result = sim.upload_file(
upload_url["uploadUrl"],
upload_url["authorizationToken"],
f"testprefix/{metadata_filename}",
len(metadata_json_content_bytes),
"application/json",
sha1_metadata,
{},
stream_metadata,
)
uploaded_files_results = [
file_backup_upload_result,
file_metadata_upload_result,
]
class MockDownloadedFile:
def __init__(self, content: bytes) -> None:
self._content = content
@property
def text_content(self) -> str:
return self._content.decode("utf-8")
@property
def response(self):
mock_response = Mock()
mock_response.iter_content.return_value = iter([self._content])
mock_response.content = self._content
return mock_response
def mock_sim_download_file_by_id(
file_id,
file_name=None,
progress_listener=None,
range_=None,
encryption=None,
):
for file_data in uploaded_files_results:
if file_data["fileId"] == file_id or (
file_name and file_data["fileName"] == file_name
):
if file_data["fileName"].endswith(".metadata.json"):
return MockDownloadedFile(metadata_json_content_bytes)
return MockDownloadedFile(test_backup_data)
raise ValueError(
f"Mocked download_file_by_id: File with id {file_id} or name {file_name} not found."
)
def ls(
self,
prefix: str = "",
) -> list[tuple[FileVersion, str]]:
"""List files in the bucket."""
listed_files = []
for file_data in uploaded_files_results:
if prefix and not file_data["fileName"].startswith(prefix):
continue
listed_files.append(
(
FileVersion(
sim,
file_data["fileId"],
file_data["fileName"],
file_data["contentLength"],
file_data.get("contentType", "application/octet-stream"),
file_data["fileInfo"].get("sha1", ""),
file_data["fileInfo"],
file_data["uploadTimestamp"],
file_data["accountId"],
file_data["bucketId"],
"upload",
None,
None,
),
file_data["fileName"],
)
)
return listed_files
original_init = BucketSimulator.__init__
def patched_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
self.name = self.bucket_name
self.id_ = self.bucket_id
self.type_ = self.bucket_type
self.cors_rules = []
self.lifecycle_rules = []
self.revision = 1
def mock_start_large_file(
file_name, content_type, file_info, account_auth_token
):
mock_large_file = Mock()
mock_large_file.file_name = file_name
mock_large_file.file_id = "mock_file_id"
return mock_large_file
def mock_cancel_large_file(file_id, account_auth_token):
pass
def mock_upload_bytes(
self,
data_bytes,
file_name,
content_type=None,
file_info=None,
):
"""Mock upload_bytes for metadata uploads."""
stream = io.BytesIO(data_bytes)
stream.seek(0)
sha1_hash = hashlib.sha1(data_bytes).hexdigest()
return sim.upload_file(
upload_url["uploadUrl"],
upload_url["authorizationToken"],
file_name,
content_length=len(data_bytes),
content_type=content_type or "application/octet-stream",
content_sha1=sha1_hash,
file_info=file_info or {},
data_stream=stream,
)
def mock_upload_unbound_stream(
self,
stream_reader,
file_name,
content_type=None,
file_info=None,
):
"""Mock upload_unbound_stream for backup uploads."""
# Read all data from the stream
data = b""
while True:
chunk = stream_reader.read(8192)
if not chunk:
break
data += chunk
stream = io.BytesIO(data)
stream.seek(0)
return FileVersion(
sim,
"test_file_id",
file_name,
len(data),
content_type or "application/octet-stream",
hashlib.sha1(data).hexdigest(),
file_info or {},
int(time.time() * 1000),
account_id,
bucket["bucketId"],
"upload",
None,
None,
)
def mock_upload_local_file(
local_file,
file_name,
content_type=None,
file_info=None,
progress_listener=None,
):
with open(local_file, "rb") as f:
content = f.read()
stream = io.BytesIO(content)
stream.seek(0)
return sim.upload_file(
upload_url["uploadUrl"],
upload_url["authorizationToken"],
file_name,
content_length=len(content),
content_type=content_type or "application/octet-stream",
content_sha1=None,
file_info=file_info or {},
data_stream=stream,
)
import b2sdk.v2.bucket # noqa: PLC0415
with (
patch.object(
sim, "download_file_by_id", mock_sim_download_file_by_id, create=True
),
patch.object(BucketSimulator, "ls", ls, create=True),
patch.object(BucketSimulator, "__init__", patched_init),
patch.object(
BucketSimulator, "start_large_file", mock_start_large_file, create=True
),
patch.object(
BucketSimulator,
"cancel_large_file",
mock_cancel_large_file,
create=True,
),
patch.object(
BucketSimulator, "upload_bytes", mock_upload_bytes, create=True
),
patch.object(
BucketSimulator,
"upload_unbound_stream",
mock_upload_unbound_stream,
create=True,
),
patch.object(
b2sdk.v2.bucket.Bucket, "upload_local_file", mock_upload_local_file
),
):
yield BackblazeFixture(
application_key_id, application_key, bucket, sim, auth
)
@pytest.fixture
def backup_fixture(request: pytest.FixtureRequest) -> AgentBackup:
"""Test backup fixture."""
return TEST_BACKUP
@pytest.fixture
def mock_config_entry(b2_fixture: Any) -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
entry_id="c6dd4663ec2c75fe04701be54c03f27b",
title="test",
domain=DOMAIN,
data={
**USER_INPUT,
CONF_KEY_ID: b2_fixture.key_id,
CONF_APPLICATION_KEY: b2_fixture.application_key,
},
)
class BackblazeFixture:
"""Mock Backblaze B2 account."""
def __init__( # noqa: D107
self,
key_id: str,
application_key: str,
bucket: dict[str, Any],
sim: RawSimulator,
auth: dict[str, Any],
) -> None:
self.key_id = key_id
self.application_key = application_key
self.bucket = bucket
self.sim = sim
self.auth = auth
self.api_url = auth["apiInfo"]["storageApi"]["apiUrl"]
self.account_id = auth["accountId"]
class AccountInfo:
"""Mock account info."""
def __init__(self, allowed: dict[str, Any]) -> None: # noqa: D107
self._allowed = allowed
def get_allowed(self):
"""Return allowed capabilities."""
return self._allowed
def get_account_id(self):
"""Return account ID."""
return "test_account_id"
def get_api_url(self):
"""Return API URL."""
return "https://api001.backblazeb2.com"
def get_download_url(self):
"""Return download URL."""
return "https://f001.backblazeb2.com"
def get_minimum_part_size(self):
"""Return minimum part size."""
return 5000000

View File

@@ -0,0 +1,32 @@
"""Consts for Backblaze B2 tests."""
from homeassistant.components.backblaze_b2.const import CONF_BUCKET, CONF_PREFIX
from homeassistant.components.backup import AgentBackup
USER_INPUT = {
CONF_BUCKET: "testBucket",
CONF_PREFIX: "testprefix/",
}
TEST_BACKUP = AgentBackup(
addons=[],
backup_id="23e64aec",
date="2024-11-22T11:48:48.727189+01:00",
database_included=True,
extra_metadata={},
folders=[],
homeassistant_included=True,
homeassistant_version="2024.12.0.dev0",
name="Core 2024.12.0.dev0",
protected=False,
size=48,
)
BACKUP_METADATA = {
"metadata_version": "1",
"backup_id": "23e64aec",
"backup_metadata": TEST_BACKUP.as_dict(),
}
METADATA_FILE_SUFFIX = ".metadata.json"

View File

@@ -0,0 +1,865 @@
"""Backblaze B2 backup agent tests."""
from collections.abc import AsyncGenerator
from io import StringIO
import json
import logging
import threading
import time
from unittest.mock import Mock, patch
from b2sdk._internal.raw_simulator import BucketSimulator
from b2sdk.v2.exception import B2Error
import pytest
from homeassistant.components.backblaze_b2.backup import (
_parse_metadata,
async_register_backup_agents_listener,
)
from homeassistant.components.backblaze_b2.const import (
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from homeassistant.components.backup import DOMAIN as BACKUP_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from . import setup_integration
from .const import BACKUP_METADATA, TEST_BACKUP
from tests.common import MockConfigEntry
from tests.typing import ClientSessionGenerator, MagicMock, WebSocketGenerator
@pytest.fixture(autouse=True)
async def setup_backup_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> AsyncGenerator[None]:
"""Set up integration."""
with (
patch("homeassistant.components.backup.is_hassio", return_value=False),
patch("homeassistant.components.backup.store.STORE_DELAY_SAVE", 0),
):
assert await async_setup_component(hass, BACKUP_DOMAIN, {})
await setup_integration(hass, mock_config_entry)
await hass.async_block_till_done()
yield
@pytest.fixture
def mock_backup_files():
"""Create standard mock backup file and metadata file."""
mock_main = Mock()
mock_main.file_name = f"testprefix/{TEST_BACKUP.backup_id}.tar"
mock_main.size = TEST_BACKUP.size
mock_main.delete = Mock()
mock_metadata = Mock()
mock_metadata.file_name = f"testprefix/{TEST_BACKUP.backup_id}.metadata.json"
mock_metadata.size = 100
mock_download = Mock()
mock_response = Mock()
mock_response.content = json.dumps(BACKUP_METADATA).encode()
mock_download.response = mock_response
mock_metadata.download = Mock(return_value=mock_download)
mock_metadata.delete = Mock()
return mock_main, mock_metadata
async def test_agents_info(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test agent info."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert any(
agent["agent_id"] == f"{DOMAIN}.{mock_config_entry.entry_id}"
for agent in response["result"]["agents"]
)
async def test_agents_list_backups(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test listing backups."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert "backups" in response["result"]
async def test_agents_get_backup(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test getting backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
if response["result"]["backup"]:
assert response["result"]["backup"]["backup_id"] == TEST_BACKUP.backup_id
async def test_agents_get_backup_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test getting nonexistent backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/details", "backup_id": "random"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["backup"] is None
async def test_agents_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_backup_files,
) -> None:
"""Test deleting backup."""
client = await hass_ws_client(hass)
mock_main, mock_metadata = mock_backup_files
def mock_ls(_self, _prefix=""):
return iter([(mock_main, None), (mock_metadata, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id(
{"type": "backup/delete", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
mock_main.delete.assert_called_once()
mock_metadata.delete.assert_called_once()
async def test_agents_delete_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test deleting nonexistent backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/delete", "backup_id": "random"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
async def test_agents_download(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test downloading backup."""
client = await hass_client()
resp = await client.get(
f"/api/backup/download/{TEST_BACKUP.backup_id}?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 200
async def test_agents_download_not_found(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test downloading nonexistent backup."""
client = await hass_client()
resp = await client.get(
f"/api/backup/download/nonexistent?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 404
async def test_get_file_for_download_raises_not_found(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test exception handling for nonexistent backup."""
client = await hass_client()
def mock_ls_empty(_self, _prefix=""):
return iter([])
with patch.object(BucketSimulator, "ls", mock_ls_empty):
resp = await client.get(
f"/api/backup/download/nonexistent?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 404
@pytest.mark.parametrize(
("error_type", "exception"),
[
("b2_error", B2Error),
("runtime_error", RuntimeError),
],
)
async def test_error_during_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
mock_backup_files,
error_type: str,
exception: type[Exception],
) -> None:
"""Test error handling during deletion."""
client = await hass_ws_client(hass)
mock_main, mock_metadata = mock_backup_files
mock_metadata.delete = Mock(side_effect=exception("Delete failed"))
def mock_ls(_self, _prefix=""):
return iter([(mock_main, None), (mock_metadata, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id(
{"type": "backup/delete", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert (
f"{DOMAIN}.{mock_config_entry.entry_id}" in response["result"]["agent_errors"]
)
async def test_listeners_get_cleaned_up(hass: HomeAssistant) -> None:
"""Test listener cleanup."""
listener = MagicMock()
remove_listener = async_register_backup_agents_listener(hass, listener=listener)
hass.data[DATA_BACKUP_AGENT_LISTENERS] = [listener] # type: ignore[misc]
remove_listener()
assert DATA_BACKUP_AGENT_LISTENERS not in hass.data
async def test_parse_metadata_invalid_json() -> None:
"""Test metadata parsing."""
with pytest.raises(ValueError, match="Invalid JSON format"):
_parse_metadata("invalid json")
with pytest.raises(TypeError, match="JSON content is not a dictionary"):
_parse_metadata('["not", "a", "dict"]')
async def test_error_during_list_backups(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test error handling during list."""
client = await hass_ws_client(hass)
def mock_ls_error(_self, _prefix=""):
raise B2Error("API error")
with patch.object(BucketSimulator, "ls", mock_ls_error):
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert (
f"{DOMAIN}.{mock_config_entry.entry_id}" in response["result"]["agent_errors"]
)
async def test_error_during_get_backup(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test error handling during get."""
client = await hass_ws_client(hass)
def mock_ls_error(_self, _prefix=""):
raise B2Error("API error")
with patch.object(BucketSimulator, "ls", mock_ls_error):
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": "test_backup"}
)
response = await client.receive_json()
assert response["success"]
assert (
f"{DOMAIN}.{mock_config_entry.entry_id}" in response["result"]["agent_errors"]
)
async def test_metadata_file_download_error_during_list(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test metadata download error handling."""
client = await hass_ws_client(hass)
mock_metadata = Mock()
mock_metadata.file_name = "testprefix/test.metadata.json"
mock_metadata.download = Mock(side_effect=B2Error("Download failed"))
mock_tar = Mock()
mock_tar.file_name = "testprefix/test.tar"
def mock_ls(_self, _prefix=""):
return iter([(mock_metadata, None), (mock_tar, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
async def test_delete_with_metadata_error(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
mock_backup_files,
) -> None:
"""Test error handling during metadata deletion."""
client = await hass_ws_client(hass)
mock_main, mock_metadata = mock_backup_files
mock_metadata.delete = Mock(side_effect=B2Error("Delete failed"))
def mock_ls(_self, _prefix=""):
return iter([(mock_main, None), (mock_metadata, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id(
{"type": "backup/delete", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert (
f"{DOMAIN}.{mock_config_entry.entry_id}" in response["result"]["agent_errors"]
)
mock_main.delete.assert_called_once()
mock_metadata.delete.assert_called_once()
async def test_download_backup_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test downloading nonexistent backup."""
client = await hass_ws_client(hass)
def mock_ls_empty(_self, _prefix=""):
return iter([])
with patch.object(BucketSimulator, "ls", mock_ls_empty):
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": "nonexistent"}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["backup"] is None
async def test_metadata_file_invalid_json_during_list(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test invalid metadata handling."""
client = await hass_ws_client(hass)
mock_metadata = Mock()
mock_metadata.file_name = "testprefix/bad.metadata.json"
mock_download = Mock()
mock_response = Mock()
mock_response.content = b"not valid json"
mock_download.response = mock_response
mock_metadata.download = Mock(return_value=mock_download)
mock_tar = Mock()
mock_tar.file_name = "testprefix/bad.tar"
def mock_ls(_self, _prefix=""):
return iter([(mock_metadata, None), (mock_tar, None)])
with (
patch.object(BucketSimulator, "ls", mock_ls),
caplog.at_level(logging.WARNING),
):
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
async def test_upload_with_cleanup_and_logging(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test upload logging and cleanup."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=TEST_BACKUP,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=TEST_BACKUP,
),
patch("pathlib.Path.open") as mocked_open,
caplog.at_level(logging.DEBUG),
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert any("Main backup file upload finished" in msg for msg in caplog.messages)
assert any("Metadata file upload finished" in msg for msg in caplog.messages)
assert any("Backup upload complete" in msg for msg in caplog.messages)
caplog.clear()
mock_file_info = Mock()
mock_file_info.delete = Mock()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=TEST_BACKUP,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=TEST_BACKUP,
),
patch("pathlib.Path.open") as mocked_open,
patch.object(
BucketSimulator,
"upload_bytes",
side_effect=B2Error("Metadata upload failed"),
),
patch.object(
BucketSimulator, "get_file_info_by_name", return_value=mock_file_info
),
caplog.at_level(logging.DEBUG),
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
mock_file_info.delete.assert_called_once()
assert any(
"Successfully deleted partially uploaded" in msg for msg in caplog.messages
)
async def test_upload_with_cleanup_failure(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test upload with cleanup failure when metadata upload fails."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=TEST_BACKUP,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=TEST_BACKUP,
),
patch("pathlib.Path.open") as mocked_open,
patch.object(
BucketSimulator,
"upload_bytes",
side_effect=B2Error("Metadata upload failed"),
),
patch.object(
BucketSimulator,
"get_file_info_by_name",
side_effect=B2Error("Cleanup failed"),
),
caplog.at_level(logging.DEBUG),
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert any(
"Failed to clean up partially uploaded main backup file" in msg
for msg in caplog.messages
)
async def test_cache_behavior(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test backup list caching."""
client = await hass_ws_client(hass)
call_count = []
original_ls = BucketSimulator.ls
def ls_with_counter(self, prefix=""):
call_count.append(1)
return original_ls(self, prefix)
with patch.object(BucketSimulator, "ls", ls_with_counter):
await client.send_json_auto_id({"type": "backup/info"})
response1 = await client.receive_json()
assert response1["success"]
first_call_count = len(call_count)
assert first_call_count > 0
await client.send_json_auto_id({"type": "backup/info"})
response2 = await client.receive_json()
assert response2["success"]
assert len(call_count) == first_call_count
assert response1["result"]["backups"] == response2["result"]["backups"]
async def test_metadata_processing_errors(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test metadata error handling."""
client = await hass_ws_client(hass)
mock_metadata = Mock()
mock_metadata.file_name = "testprefix/test.metadata.json"
mock_metadata.download = Mock(side_effect=B2Error("Download failed"))
mock_tar = Mock()
mock_tar.file_name = "testprefix/test.tar"
def mock_ls_download_error(_self, _prefix=""):
return iter([(mock_metadata, None), (mock_tar, None)])
with (
patch.object(BucketSimulator, "ls", mock_ls_download_error),
caplog.at_level(logging.WARNING),
):
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert "backups" in response["result"]
caplog.clear()
mock_metadata3 = Mock()
mock_metadata3.file_name = f"testprefix/{TEST_BACKUP.backup_id}.metadata.json"
mock_metadata3.download = Mock(side_effect=B2Error("Download failed"))
mock_tar3 = Mock()
mock_tar3.file_name = f"testprefix/{TEST_BACKUP.backup_id}.tar"
def mock_ls_id_error(_self, _prefix=""):
return iter([(mock_metadata3, None), (mock_tar3, None)])
with patch.object(BucketSimulator, "ls", mock_ls_id_error):
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["backup"] is None
caplog.clear()
mock_metadata4 = Mock()
mock_metadata4.file_name = f"testprefix/{TEST_BACKUP.backup_id}.metadata.json"
mock_download4 = Mock()
mock_response4 = Mock()
mock_response4.content = b"invalid json"
mock_download4.response = mock_response4
mock_metadata4.download = Mock(return_value=mock_download4)
mock_tar4 = Mock()
mock_tar4.file_name = f"testprefix/{TEST_BACKUP.backup_id}.tar"
def mock_ls_invalid_json(_self, _prefix=""):
return iter([(mock_metadata4, None), (mock_tar4, None)])
with patch.object(BucketSimulator, "ls", mock_ls_invalid_json):
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["backup"] is None
async def test_download_triggers_backup_not_found(
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
mock_backup_files,
) -> None:
"""Test race condition where backup disappears during download."""
client = await hass_client()
mock_main, mock_metadata = mock_backup_files
ls_call_count = [0]
def mock_ls_race_condition(_self, _prefix=""):
ls_call_count[0] += 1
if ls_call_count[0] == 1:
return iter([(mock_main, None), (mock_metadata, None)])
return iter([])
with (
patch.object(BucketSimulator, "ls", mock_ls_race_condition),
patch("homeassistant.components.backblaze_b2.backup.CACHE_TTL", 0),
):
resp = await client.get(
f"/api/backup/download/{TEST_BACKUP.backup_id}?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 404
async def test_get_backup_cache_paths(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test cache hit and update paths."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response1 = await client.receive_json()
assert response1["success"]
assert len(response1["result"]["backups"]) > 0
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response2 = await client.receive_json()
assert response2["success"]
assert response2["result"]["backup"]["backup_id"] == TEST_BACKUP.backup_id
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response3 = await client.receive_json()
assert response3["success"]
assert response3["result"]["backup"]["backup_id"] == TEST_BACKUP.backup_id
async def test_metadata_json_parse_error(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test ValueError handling when metadata JSON parsing fails."""
client = await hass_ws_client(hass)
mock_metadata = Mock()
mock_metadata.file_name = f"testprefix/{TEST_BACKUP.backup_id}.metadata.json"
mock_download = Mock()
mock_response = Mock()
mock_response.content = b"{ invalid json }"
mock_download.response = mock_response
mock_metadata.download = Mock(return_value=mock_download)
mock_tar = Mock()
mock_tar.file_name = f"testprefix/{TEST_BACKUP.backup_id}.tar"
def mock_ls(_self, _prefix=""):
return iter([(mock_metadata, None), (mock_tar, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["backup"] is None
async def test_orphaned_metadata_files(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test handling of metadata files without corresponding tar files."""
client = await hass_ws_client(hass)
mock_metadata = Mock()
mock_metadata.file_name = f"testprefix/{TEST_BACKUP.backup_id}.metadata.json"
mock_download = Mock()
mock_response = Mock()
mock_response.content = json.dumps(BACKUP_METADATA).encode()
mock_download.response = mock_response
mock_metadata.download = Mock(return_value=mock_download)
def mock_ls(_self, _prefix=""):
return iter([(mock_metadata, None)])
with (
patch.object(BucketSimulator, "ls", mock_ls),
caplog.at_level(logging.WARNING),
):
await client.send_json_auto_id({"type": "backup/info"})
response1 = await client.receive_json()
assert response1["success"]
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response2 = await client.receive_json()
assert response2["success"]
assert response2["result"]["backup"] is None
assert any(
"no corresponding backup file" in record.message for record in caplog.records
)
async def test_get_backup_updates_cache(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_backup_files,
) -> None:
"""Test cache update when metadata initially fails then succeeds."""
client = await hass_ws_client(hass)
mock_main, mock_metadata = mock_backup_files
download_call_count = [0]
def mock_download():
download_call_count[0] += 1
mock_download_obj = Mock()
mock_response = Mock()
if download_call_count[0] == 1:
mock_response.content = b"{ invalid json }"
else:
mock_response.content = json.dumps(BACKUP_METADATA).encode()
mock_download_obj.response = mock_response
return mock_download_obj
mock_metadata.download = mock_download
def mock_ls(_self, _prefix=""):
return iter([(mock_main, None), (mock_metadata, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id({"type": "backup/info"})
response1 = await client.receive_json()
assert response1["success"]
assert len(response1["result"]["backups"]) == 0
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": TEST_BACKUP.backup_id}
)
response2 = await client.receive_json()
assert response2["success"]
assert response2["result"]["backup"]["backup_id"] == TEST_BACKUP.backup_id
async def test_delete_clears_backup_cache(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_backup_files,
) -> None:
"""Test that deleting a backup clears it from cache."""
client = await hass_ws_client(hass)
mock_main, mock_metadata = mock_backup_files
def mock_ls(_self, _prefix=""):
return iter([(mock_main, None), (mock_metadata, None)])
with patch.object(BucketSimulator, "ls", mock_ls):
await client.send_json_auto_id({"type": "backup/info"})
response1 = await client.receive_json()
assert response1["success"]
assert len(response1["result"]["backups"]) > 0
await client.send_json_auto_id(
{"type": "backup/delete", "backup_id": TEST_BACKUP.backup_id}
)
response2 = await client.receive_json()
assert response2["success"]
mock_main.delete.assert_called_once()
mock_metadata.delete.assert_called_once()
async def test_metadata_downloads_are_sequential(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test that metadata downloads are processed sequentially to avoid exhausting executor pool."""
current_concurrent = 0
max_concurrent = 0
lock = threading.Lock()
def mock_download_sync():
nonlocal current_concurrent, max_concurrent
with lock:
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
time.sleep(0.05)
with lock:
current_concurrent -= 1
mock_download_obj = Mock()
mock_response = Mock()
mock_response.content = json.dumps(BACKUP_METADATA).encode()
mock_download_obj.response = mock_response
return mock_download_obj
mock_files = []
for i in range(15):
mock_metadata = Mock()
mock_metadata.file_name = f"testprefix/backup{i}.metadata.json"
mock_metadata.download = mock_download_sync
mock_tar = Mock()
mock_tar.file_name = f"testprefix/backup{i}.tar"
mock_tar.size = TEST_BACKUP.size
mock_files.extend([(mock_metadata, None), (mock_tar, None)])
def mock_ls(_self, _prefix=""):
return iter(mock_files)
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
with patch.object(BucketSimulator, "ls", mock_ls):
await setup_integration(hass, mock_config_entry)
await hass.async_block_till_done()
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
# Verify downloads were sequential (max 1 at a time)
assert max_concurrent == 1

View File

@@ -0,0 +1,329 @@
"""Backblaze B2 config flow tests."""
from unittest.mock import patch
from b2sdk.v2 import exception
import pytest
from homeassistant.components.backblaze_b2.const import (
CONF_APPLICATION_KEY,
CONF_KEY_ID,
DOMAIN,
)
from homeassistant.config_entries import (
SOURCE_REAUTH,
SOURCE_RECONFIGURE,
SOURCE_USER,
ConfigFlowResult,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import BackblazeFixture
from .const import USER_INPUT
from tests.common import MockConfigEntry
async def _async_start_flow(
hass: HomeAssistant,
key_id: str,
application_key: str,
user_input: dict[str, str] | None = None,
) -> ConfigFlowResult:
"""Initialize the config flow."""
if user_input is None:
user_input = USER_INPUT
user_input[CONF_KEY_ID] = key_id
user_input[CONF_APPLICATION_KEY] = application_key
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
assert result.get("errors") == {}
return await hass.config_entries.flow.async_configure(result["flow_id"], user_input)
async def test_basic_flows(hass: HomeAssistant, b2_fixture: BackblazeFixture) -> None:
"""Test basic successful config flows."""
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result.get("title") == "testBucket"
assert result.get("data") == USER_INPUT
async def test_prefix_normalization(
hass: HomeAssistant, b2_fixture: BackblazeFixture
) -> None:
"""Test prefix normalization in config flow."""
user_input = {**USER_INPUT, "prefix": "test-prefix/foo"}
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key, user_input
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result["data"]["prefix"] == "test-prefix/foo/"
async def test_empty_prefix(hass: HomeAssistant, b2_fixture: BackblazeFixture) -> None:
"""Test empty prefix handling."""
user_input_empty = {**USER_INPUT, "prefix": ""}
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key, user_input_empty
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result["data"]["prefix"] == ""
async def test_already_configured(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
b2_fixture: BackblazeFixture,
) -> None:
"""Test abort if already configured."""
mock_config_entry.add_to_hass(hass)
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
@pytest.mark.parametrize(
("error_type", "setup", "expected_error", "expected_field"),
[
(
"invalid_auth",
{"key_id": "invalid", "app_key": "invalid"},
"invalid_credentials",
"base",
),
(
"invalid_bucket",
{"bucket": "invalid-bucket-name"},
"invalid_bucket_name",
"bucket",
),
(
"cannot_connect",
{
"patch": "b2sdk.v2.RawSimulator.authorize_account",
"exception": exception.ConnectionReset,
"args": ["test"],
},
"cannot_connect",
"base",
),
(
"restricted_bucket",
{
"patch": "b2sdk.v2.RawSimulator.get_bucket_by_name",
"exception": exception.RestrictedBucket,
"args": ["testBucket"],
},
"restricted_bucket",
"bucket",
),
(
"missing_account_data",
{
"patch": "b2sdk.v2.RawSimulator.authorize_account",
"exception": exception.MissingAccountData,
"args": ["key"],
},
"invalid_credentials",
"base",
),
(
"invalid_capability",
{"mock_capabilities": ["writeFiles", "listFiles", "deleteFiles"]},
"invalid_capability",
"base",
),
(
"no_allowed_info",
{"mock_allowed": None},
"invalid_capability",
"base",
),
(
"no_capabilities",
{"mock_allowed": {}},
"invalid_capability",
"base",
),
("invalid_prefix", {"mock_prefix": "test/"}, "invalid_prefix", "prefix"),
(
"unknown_error",
{
"patch": "b2sdk.v2.RawSimulator.authorize_account",
"exception": RuntimeError,
"args": ["Unexpected error"],
},
"unknown",
"base",
),
],
)
async def test_config_flow_errors(
hass: HomeAssistant,
b2_fixture: BackblazeFixture,
error_type: str,
setup: dict,
expected_error: str,
expected_field: str,
) -> None:
"""Test various config flow error scenarios."""
if error_type == "invalid_auth":
result = await _async_start_flow(hass, setup["key_id"], setup["app_key"])
elif error_type == "invalid_bucket":
invalid_input = {**USER_INPUT, "bucket": setup["bucket"]}
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key, invalid_input
)
elif "patch" in setup:
with patch(setup["patch"], side_effect=setup["exception"](*setup["args"])):
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
elif "mock_capabilities" in setup:
with patch(
"b2sdk.v2.RawSimulator.account_info.get_allowed",
return_value={"capabilities": setup["mock_capabilities"]},
):
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
elif "mock_allowed" in setup:
with patch(
"b2sdk.v2.RawSimulator.account_info.get_allowed",
return_value=setup["mock_allowed"],
):
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
elif "mock_prefix" in setup:
with patch(
"b2sdk.v2.RawSimulator.account_info.get_allowed",
return_value={
"capabilities": ["writeFiles", "listFiles", "deleteFiles", "readFiles"],
"namePrefix": setup["mock_prefix"],
},
):
result = await _async_start_flow(
hass, b2_fixture.key_id, b2_fixture.application_key
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {expected_field: expected_error}
if error_type == "restricted_bucket":
assert result.get("description_placeholders") == {
"brand_name": "Backblaze B2",
"restricted_bucket_name": "testBucket",
}
elif error_type == "invalid_prefix":
assert result.get("description_placeholders") == {
"brand_name": "Backblaze B2",
"allowed_prefix": "test/",
}
@pytest.mark.parametrize(
("flow_type", "scenario"),
[
("reauth", "success"),
("reauth", "invalid_credentials"),
("reconfigure", "success"),
("reconfigure", "prefix_normalization"),
("reconfigure", "validation_error"),
],
)
async def test_advanced_flows(
hass: HomeAssistant,
b2_fixture: BackblazeFixture,
mock_config_entry: MockConfigEntry,
flow_type: str,
scenario: str,
) -> None:
"""Test reauthentication and reconfiguration flows."""
mock_config_entry.add_to_hass(hass)
if flow_type == "reauth":
source = SOURCE_REAUTH
step_name = "reauth_confirm"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": source, "entry_id": mock_config_entry.entry_id},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == step_name
if scenario == "success":
config = {
CONF_KEY_ID: b2_fixture.key_id,
CONF_APPLICATION_KEY: b2_fixture.application_key,
}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], config
)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "reauth_successful"
else: # invalid_credentials
config = {CONF_KEY_ID: "invalid", CONF_APPLICATION_KEY: "invalid"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], config
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "invalid_credentials"}
elif flow_type == "reconfigure":
source = SOURCE_RECONFIGURE
step_name = "reconfigure"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": source, "entry_id": mock_config_entry.entry_id},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == step_name
if scenario == "success":
config = {
CONF_KEY_ID: b2_fixture.key_id,
CONF_APPLICATION_KEY: b2_fixture.application_key,
"bucket": "testBucket",
"prefix": "new_prefix/",
}
elif scenario == "prefix_normalization":
config = {
CONF_KEY_ID: b2_fixture.key_id,
CONF_APPLICATION_KEY: b2_fixture.application_key,
"bucket": "testBucket",
"prefix": "no_slash_prefix",
}
else: # validation_error
config = {
CONF_KEY_ID: "invalid_key",
CONF_APPLICATION_KEY: "invalid_app_key",
"bucket": "invalid_bucket",
"prefix": "",
}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], config
)
if scenario == "validation_error":
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "invalid_credentials"}
else:
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "reconfigure_successful"

View File

@@ -0,0 +1,88 @@
"""Test Backblaze B2 diagnostics."""
from unittest.mock import Mock, patch
from homeassistant.components.backblaze_b2.diagnostics import (
async_get_config_entry_diagnostics,
)
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_diagnostics_basic(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Test basic diagnostics data collection."""
await setup_integration(hass, mock_config_entry)
result = await async_get_config_entry_diagnostics(hass, mock_config_entry)
assert "entry_data" in result
assert "entry_options" in result
assert "bucket_info" in result
assert "account_info" in result
# Check that sensitive data is redacted
assert mock_config_entry.data["key_id"] not in str(result["entry_data"])
assert mock_config_entry.data["application_key"] not in str(result["entry_data"])
async def test_diagnostics_error_handling(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Test diagnostics handles errors gracefully."""
mock_config_entry.runtime_data = None
mock_config_entry.add_to_hass(hass)
result = await async_get_config_entry_diagnostics(hass, mock_config_entry)
assert "bucket_info" in result
assert "account_info" in result
async def test_diagnostics_bucket_data_redaction(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Test diagnostics redacts bucket-specific sensitive data."""
await setup_integration(hass, mock_config_entry)
mock_bucket = Mock()
mock_bucket.name = "test-bucket"
mock_bucket.id_ = "bucket_id_123"
mock_bucket.type_ = "allPrivate"
mock_bucket.cors_rules = []
mock_bucket.lifecycle_rules = []
mock_bucket.revision = 1
mock_api = Mock()
mock_account_info = Mock()
mock_account_info.get_account_id.return_value = "account123"
mock_account_info.get_api_url.return_value = "https://api.backblazeb2.com"
mock_account_info.get_download_url.return_value = "https://f001.backblazeb2.com"
mock_account_info.get_minimum_part_size.return_value = 5000000
mock_account_info.get_allowed.return_value = {
"capabilities": ["writeFiles", "listFiles", "readFiles"],
"bucketId": "test_bucket_id_123",
"bucketName": "restricted_bucket",
"namePrefix": "restricted/path/",
}
mock_bucket.api = mock_api
mock_api.account_info = mock_account_info
with patch.object(mock_config_entry, "runtime_data", mock_bucket):
result = await async_get_config_entry_diagnostics(hass, mock_config_entry)
account_data = result["account_info"]
assert account_data["allowed"]["capabilities"] == [
"writeFiles",
"listFiles",
"readFiles",
]
assert account_data["allowed"]["bucketId"] == "**REDACTED**"
assert account_data["allowed"]["bucketName"] == "**REDACTED**"
assert account_data["allowed"]["namePrefix"] == "**REDACTED**"

View File

@@ -0,0 +1,106 @@
"""Test the Backblaze B2 storage integration."""
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
from b2sdk.v2 import exception
import pytest
from homeassistant.components.backblaze_b2.const import CONF_APPLICATION_KEY
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_load_unload_config_entry(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test loading and unloading the integration."""
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state == ConfigEntryState.LOADED
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state == ConfigEntryState.NOT_LOADED # type: ignore[comparison-overlap]
async def test_setup_entry_invalid_auth(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test setup entry with invalid auth."""
mock_config = MockConfigEntry(
entry_id=mock_config_entry.entry_id,
title=mock_config_entry.title,
domain=mock_config_entry.domain,
data={
**mock_config_entry.data,
CONF_APPLICATION_KEY: "invalid_key_id",
},
)
await setup_integration(hass, mock_config)
assert mock_config.state is ConfigEntryState.SETUP_ERROR
@pytest.mark.parametrize(
("exception", "state"),
[
(exception.Unauthorized("msg", "code"), ConfigEntryState.SETUP_ERROR),
(exception.RestrictedBucket("testBucket"), ConfigEntryState.SETUP_RETRY),
(exception.NonExistentBucket(), ConfigEntryState.SETUP_RETRY),
(exception.ConnectionReset(), ConfigEntryState.SETUP_RETRY),
(exception.MissingAccountData("key"), ConfigEntryState.SETUP_ERROR),
],
)
async def test_setup_entry_restricted_bucket(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
exception: Exception,
state: ConfigEntryState,
) -> None:
"""Test setup entry with restricted bucket."""
with patch(
"b2sdk.v2.RawSimulator.get_bucket_by_name",
side_effect=exception,
):
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is state
async def test_periodic_issue_check(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Test periodic issue check functionality."""
captured_callback = None
def capture_callback(hass: HomeAssistant | None, callback, interval):
nonlocal captured_callback
captured_callback = callback
return MagicMock()
with (
patch(
"homeassistant.components.backblaze_b2.async_check_for_repair_issues",
new_callable=AsyncMock,
) as mock_check,
patch(
"homeassistant.components.backblaze_b2.async_track_time_interval",
side_effect=capture_callback,
),
):
await setup_integration(hass, mock_config_entry)
assert captured_callback is not None
await captured_callback(datetime.now())
assert mock_check.call_count == 2 # setup + callback
mock_check.assert_called_with(hass, mock_config_entry)

View File

@@ -0,0 +1,58 @@
"""Test Backblaze B2 repairs."""
from unittest.mock import Mock, patch
from b2sdk.v2.exception import (
B2Error,
NonExistentBucket,
RestrictedBucket,
Unauthorized,
)
import pytest
from homeassistant.components.backblaze_b2.repairs import (
async_check_for_repair_issues,
async_create_fix_flow,
)
from homeassistant.components.repairs import ConfirmRepairFlow
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from tests.common import MockConfigEntry
@pytest.fixture
def mock_entry():
"""Create a mock config entry with runtime data."""
entry = MockConfigEntry(domain="backblaze_b2", data={"bucket": "test"})
entry.runtime_data = Mock()
return entry
@pytest.mark.parametrize(
("exception", "expected_issues"),
[
(Unauthorized("test", "auth_failed"), 0), # Handled by reauth flow
(RestrictedBucket("test"), 1), # Creates repair issue
(NonExistentBucket("test"), 1), # Creates repair issue
(B2Error("test"), 0), # Just logs, no issue
],
)
async def test_repair_issue_creation(
hass: HomeAssistant,
mock_entry: MockConfigEntry,
exception: Exception,
expected_issues: int,
) -> None:
"""Test repair issue creation for different exception types."""
with patch.object(hass, "async_add_executor_job", side_effect=exception):
await async_check_for_repair_issues(hass, mock_entry)
issues = ir.async_get(hass).issues
assert len(issues) == expected_issues
async def test_async_create_fix_flow(hass: HomeAssistant) -> None:
"""Test creating repair fix flow."""
flow = await async_create_fix_flow(hass, "test_issue", None)
assert isinstance(flow, ConfirmRepairFlow)