1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-15 07:36:16 +00:00

IDrive e2 backup provider (#144910)

Co-authored-by: Josef Zweck <josef@zweck.dev>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Patrick Vorgers
2026-02-13 00:49:03 +01:00
committed by GitHub
parent 196c6d9839
commit d930755f92
18 changed files with 1912 additions and 0 deletions

2
CODEOWNERS generated
View File

@@ -762,6 +762,8 @@ build.json @home-assistant/supervisor
/tests/components/icloud/ @Quentame @nzapponi
/homeassistant/components/idasen_desk/ @abmantis
/tests/components/idasen_desk/ @abmantis
/homeassistant/components/idrive_e2/ @patrickvorgers
/tests/components/idrive_e2/ @patrickvorgers
/homeassistant/components/igloohome/ @keithle888
/tests/components/igloohome/ @keithle888
/homeassistant/components/ign_sismologia/ @exxamalte

View File

@@ -0,0 +1,99 @@
"""The IDrive e2 integration."""
from __future__ import annotations
import logging
from typing import Any, cast
from aiobotocore.client import AioBaseClient as S3Client
from aiobotocore.session import AioSession
from aiohttp import ClientError as AiohttpClientError
from botocore.exceptions import ClientError, ConnectionError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
type IDriveE2ConfigEntry = ConfigEntry[S3Client]
_LOGGER = logging.getLogger(__name__)
async def _async_safe_client_close(client: S3Client | None) -> None:
"""Close client without masking the original exception."""
if client is None:
return
try:
# Best effort to close the client which doesn't mask the setup exception
await client.close()
except AiohttpClientError, OSError, RuntimeError:
_LOGGER.debug("Failed to close aiobotocore client", exc_info=True)
async def async_setup_entry(hass: HomeAssistant, entry: IDriveE2ConfigEntry) -> bool:
"""Set up IDrive e2 from a config entry."""
session = AioSession()
client: S3Client | None = None
try:
# pylint: disable-next=unnecessary-dunder-call
client = await session.create_client(
"s3",
endpoint_url=entry.data[CONF_ENDPOINT_URL],
aws_secret_access_key=entry.data[CONF_SECRET_ACCESS_KEY],
aws_access_key_id=entry.data[CONF_ACCESS_KEY_ID],
).__aenter__()
await cast(Any, client).head_bucket(Bucket=entry.data[CONF_BUCKET])
except ClientError as err:
await _async_safe_client_close(client)
code = str(err.response.get("Error", {}).get("Code", ""))
if code in ("404", "NoSuchBucket"):
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="bucket_not_found",
translation_placeholders={"bucket": entry.data[CONF_BUCKET]},
) from err
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_credentials",
) from err
except ValueError as err:
await _async_safe_client_close(client)
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_endpoint_url",
) from err
except ConnectionError as err:
await _async_safe_client_close(client)
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
entry.runtime_data = client
def notify_backup_listeners() -> None:
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()
entry.async_on_unload(entry.async_on_state_change(notify_backup_listeners))
return True
async def async_unload_entry(hass: HomeAssistant, entry: IDriveE2ConfigEntry) -> bool:
"""Unload a config entry."""
client = entry.runtime_data
await client.close()
return True

View File

@@ -0,0 +1,362 @@
"""Backup platform for the IDrive e2 integration."""
from collections.abc import AsyncIterator, Callable, Coroutine
import functools
import json
import logging
from time import time
from typing import Any, cast
from aiobotocore.client import AioBaseClient as S3Client
from botocore.exceptions import BotoCoreError
from homeassistant.components.backup import (
AgentBackup,
BackupAgent,
BackupAgentError,
BackupNotFound,
suggested_filename,
)
from homeassistant.core import HomeAssistant, callback
from . import IDriveE2ConfigEntry
from .const import CONF_BUCKET, DATA_BACKUP_AGENT_LISTENERS, DOMAIN
_LOGGER = logging.getLogger(__name__)
CACHE_TTL = 300
# S3 part size requirements: 5 MiB to 5 GiB per part
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
# We set the threshold to 20 MiB to avoid too many parts.
# Note that each part is allocated in the memory.
MULTIPART_MIN_PART_SIZE_BYTES = 20 * 2**20
def handle_boto_errors[T](
func: Callable[..., Coroutine[Any, Any, T]],
) -> Callable[..., Coroutine[Any, Any, T]]:
"""Handle BotoCoreError exceptions by converting them to BackupAgentError."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
"""Catch BotoCoreError and raise BackupAgentError."""
try:
return await func(*args, **kwargs)
except BotoCoreError as err:
error_msg = f"Failed during {func.__name__}"
raise BackupAgentError(error_msg) from err
return wrapper
async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents."""
entries: list[IDriveE2ConfigEntry] = hass.config_entries.async_loaded_entries(
DOMAIN
)
return [IDriveE2BackupAgent(hass, entry) for entry in entries]
@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when agents are added or removed.
:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
del hass.data[DATA_BACKUP_AGENT_LISTENERS]
return remove_listener
def suggested_filenames(backup: AgentBackup) -> tuple[str, str]:
"""Return the suggested filenames for the backup and metadata files."""
base_name = suggested_filename(backup).rsplit(".", 1)[0]
return f"{base_name}.tar", f"{base_name}.metadata.json"
class IDriveE2BackupAgent(BackupAgent):
"""Backup agent for the IDrive e2 integration."""
domain = DOMAIN
def __init__(self, hass: HomeAssistant, entry: IDriveE2ConfigEntry) -> None:
"""Initialize the IDrive e2 agent."""
super().__init__()
self._client: S3Client = entry.runtime_data
self._bucket: str = entry.data[CONF_BUCKET]
self.name = entry.title
self.unique_id = entry.entry_id
self._backup_cache: dict[str, AgentBackup] = {}
self._cache_expiration = time()
@handle_boto_errors
async def async_download_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AsyncIterator[bytes]:
"""Download a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
:return: An async iterator that yields bytes.
"""
backup = await self._find_backup_by_id(backup_id)
tar_filename, _ = suggested_filenames(backup)
response = await cast(Any, self._client).get_object(
Bucket=self._bucket, Key=tar_filename
)
return response["Body"].iter_chunks()
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup.
:param open_stream: A function returning an async iterator that yields bytes.
:param backup: Metadata about the backup that should be uploaded.
"""
tar_filename, metadata_filename = suggested_filenames(backup)
try:
if backup.size < MULTIPART_MIN_PART_SIZE_BYTES:
await self._upload_simple(tar_filename, open_stream)
else:
await self._upload_multipart(tar_filename, open_stream)
# Upload the metadata file
metadata_content = json.dumps(backup.as_dict())
await cast(Any, self._client).put_object(
Bucket=self._bucket,
Key=metadata_filename,
Body=metadata_content,
)
except BotoCoreError as err:
raise BackupAgentError("Failed to upload backup") from err
else:
# Reset cache after successful upload
self._cache_expiration = time()
async def _upload_simple(
self,
tar_filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
) -> None:
"""Upload a small file using simple upload.
:param tar_filename: The target filename for the backup.
:param open_stream: A function returning an async iterator that yields bytes.
"""
_LOGGER.debug("Starting simple upload for %s", tar_filename)
stream = await open_stream()
file_data = bytearray()
async for chunk in stream:
file_data.extend(chunk)
await cast(Any, self._client).put_object(
Bucket=self._bucket,
Key=tar_filename,
Body=bytes(file_data),
)
async def _upload_multipart(
self,
tar_filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
) -> None:
"""Upload a large file using multipart upload.
:param tar_filename: The target filename for the backup.
:param open_stream: A function returning an async iterator that yields bytes.
"""
_LOGGER.debug("Starting multipart upload for %s", tar_filename)
multipart_upload = await cast(Any, self._client).create_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
)
upload_id = multipart_upload["UploadId"]
try:
parts: list[dict[str, Any]] = []
part_number = 1
buffer = bytearray() # bytes buffer to store the data
offset = 0 # start index of unread data inside buffer
stream = await open_stream()
async for chunk in stream:
buffer.extend(chunk)
# Upload parts of exactly MULTIPART_MIN_PART_SIZE_BYTES to ensure
# all non-trailing parts have the same size (defensive implementation)
view = memoryview(buffer)
try:
while len(buffer) - offset >= MULTIPART_MIN_PART_SIZE_BYTES:
start = offset
end = offset + MULTIPART_MIN_PART_SIZE_BYTES
part_data = view[start:end]
offset = end
_LOGGER.debug(
"Uploading part number %d, size %d",
part_number,
len(part_data),
)
part = await cast(Any, self._client).upload_part(
Bucket=self._bucket,
Key=tar_filename,
PartNumber=part_number,
UploadId=upload_id,
Body=part_data.tobytes(),
)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
part_number += 1
finally:
view.release()
# Compact the buffer if the consumed offset has grown large enough. This
# avoids unnecessary memory copies when compacting after every part upload.
if offset and offset >= MULTIPART_MIN_PART_SIZE_BYTES:
buffer = bytearray(buffer[offset:])
offset = 0
# Upload the final buffer as the last part (no minimum size requirement)
# Offset should be 0 after the last compaction, but we use it as the start
# index to be defensive in case the buffer was not compacted.
if offset < len(buffer):
remaining_data = memoryview(buffer)[offset:]
_LOGGER.debug(
"Uploading final part number %d, size %d",
part_number,
len(remaining_data),
)
part = await cast(Any, self._client).upload_part(
Bucket=self._bucket,
Key=tar_filename,
PartNumber=part_number,
UploadId=upload_id,
Body=remaining_data.tobytes(),
)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
await cast(Any, self._client).complete_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
except BotoCoreError:
try:
await cast(Any, self._client).abort_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
UploadId=upload_id,
)
except BotoCoreError:
_LOGGER.exception("Failed to abort multipart upload")
raise
@handle_boto_errors
async def async_delete_backup(
self,
backup_id: str,
**kwargs: Any,
) -> None:
"""Delete a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
"""
backup = await self._find_backup_by_id(backup_id)
tar_filename, metadata_filename = suggested_filenames(backup)
# Delete both the backup file and its metadata file
await cast(Any, self._client).delete_objects(
Bucket=self._bucket,
Delete={
"Objects": [
{"Key": tar_filename},
{"Key": metadata_filename},
]
},
)
# Reset cache after successful deletion
self._cache_expiration = time()
@handle_boto_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List backups."""
backups = await self._list_backups()
return list(backups.values())
@handle_boto_errors
async def async_get_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AgentBackup:
"""Return a backup."""
return await self._find_backup_by_id(backup_id)
async def _find_backup_by_id(self, backup_id: str) -> AgentBackup:
"""Find a backup by its backup ID."""
backups = await self._list_backups()
if backup := backups.get(backup_id):
return backup
raise BackupNotFound(f"Backup {backup_id} not found")
async def _list_backups(self) -> dict[str, AgentBackup]:
"""List backups, using a cache if possible."""
if time() <= self._cache_expiration:
return self._backup_cache
backups = {}
response = await cast(Any, self._client).list_objects_v2(Bucket=self._bucket)
# Filter for metadata files only
metadata_files = [
obj
for obj in response.get("Contents", [])
if obj["Key"].endswith(".metadata.json")
]
for metadata_file in metadata_files:
try:
# Download and parse metadata file
metadata_response = await cast(Any, self._client).get_object(
Bucket=self._bucket, Key=metadata_file["Key"]
)
metadata_content = await metadata_response["Body"].read()
metadata_json = json.loads(metadata_content)
except (BotoCoreError, json.JSONDecodeError) as err:
_LOGGER.warning(
"Failed to process metadata file %s: %s",
metadata_file["Key"],
err,
)
continue
backup = AgentBackup.from_dict(metadata_json)
backups[backup.backup_id] = backup
self._backup_cache = backups
self._cache_expiration = time() + CACHE_TTL
return self._backup_cache

View File

@@ -0,0 +1,147 @@
"""IDrive e2 config flow."""
from __future__ import annotations
import logging
from typing import Any, cast
from aiobotocore.session import AioSession
from botocore.exceptions import ClientError, ConnectionError
from idrive_e2 import CannotConnect, IDriveE2Client, InvalidAuth
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_ACCESS_KEY_ID): cv.string,
vol.Required(CONF_SECRET_ACCESS_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
}
)
async def _list_buckets(
endpoint_url: str, access_key: str, secret_key: str
) -> list[str]:
"""List S3 buckets."""
session = AioSession()
async with session.create_client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
) as client:
result = await cast(Any, client).list_buckets()
return [bucket["Name"] for bucket in result.get("Buckets", []) if "Name" in bucket]
class IDriveE2ConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for IDrive e2."""
_data: dict[str, str]
_buckets: list[str]
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""First step: prompt for access_key and secret_access_key, then fetch region endpoint and buckets."""
errors: dict[str, str] = {}
if user_input is not None:
session = async_get_clientsession(self.hass)
client = IDriveE2Client(session)
try:
endpoint = await client.get_region_endpoint(
user_input[CONF_ACCESS_KEY_ID]
)
# Get the list of buckets belonging to the provided credentials
buckets = await _list_buckets(
endpoint,
user_input[CONF_ACCESS_KEY_ID],
user_input[CONF_SECRET_ACCESS_KEY],
)
except InvalidAuth, ClientError:
errors["base"] = "invalid_credentials"
except CannotConnect, ConnectionError:
errors["base"] = "cannot_connect"
except ValueError:
errors["base"] = "invalid_endpoint_url"
else:
# Check if any buckets were found
if not buckets:
errors["base"] = "no_buckets"
if not errors:
# Store validated data for the next step
self._data = {
CONF_ACCESS_KEY_ID: user_input[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: user_input[CONF_SECRET_ACCESS_KEY],
CONF_ENDPOINT_URL: endpoint,
}
self._buckets = buckets
return await self.async_step_bucket()
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
)
async def async_step_bucket(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Second step: list preloaded buckets and let user select from dropdown."""
if user_input is not None:
# Check if the entry already exists to avoid duplicates
self._async_abort_entries_match(
{
CONF_BUCKET: user_input[CONF_BUCKET],
CONF_ENDPOINT_URL: self._data[CONF_ENDPOINT_URL],
}
)
return self.async_create_entry(
title=user_input[CONF_BUCKET],
data={**self._data, CONF_BUCKET: user_input[CONF_BUCKET]},
)
# Show the bucket selection form with a dropdown selector
return self.async_show_form(
step_id="bucket",
data_schema=vol.Schema(
{
vol.Required(CONF_BUCKET): SelectSelector(
config=SelectSelectorConfig(
options=self._buckets, mode=SelectSelectorMode.DROPDOWN
)
)
}
),
)

View File

@@ -0,0 +1,17 @@
"""Constants for the IDrive e2 integration."""
from collections.abc import Callable
from typing import Final
from homeassistant.util.hass_dict import HassKey
DOMAIN: Final = "idrive_e2"
CONF_ACCESS_KEY_ID = "access_key_id"
CONF_SECRET_ACCESS_KEY = "secret_access_key"
CONF_ENDPOINT_URL = "endpoint_url"
CONF_BUCKET = "bucket"
DATA_BACKUP_AGENT_LISTENERS: HassKey[list[Callable[[], None]]] = HassKey(
f"{DOMAIN}.backup_agent_listeners"
)

View File

@@ -0,0 +1,12 @@
{
"domain": "idrive_e2",
"name": "IDrive e2",
"codeowners": ["@patrickvorgers"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/idrive_e2",
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["aiobotocore"],
"quality_scale": "bronze",
"requirements": ["aiobotocore==2.21.1", "idrive-e2-client==0.1.1"]
}

View File

@@ -0,0 +1,112 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling:
status: exempt
comment: This integration does not poll.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: This integration does not have any custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Entities of this integration does not explicitly subscribe to events.
entity-unique-id:
status: exempt
comment: This integration does not have entities.
has-entity-name:
status: exempt
comment: This integration does not have entities.
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register custom actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: This integration does not have an options flow.
docs-installation-parameters: done
entity-unavailable:
status: exempt
comment: This integration does not have entities.
integration-owner: done
log-when-unavailable: todo
parallel-updates:
status: exempt
comment: This integration does not poll.
reauthentication-flow: todo
test-coverage: done
# Gold
devices:
status: exempt
comment: This integration does not have entities.
diagnostics: todo
discovery-update-info:
status: exempt
comment: S3 is a cloud service that is not discovered on the network.
discovery:
status: exempt
comment: S3 is a cloud service that is not discovered on the network.
docs-data-update:
status: exempt
comment: This integration does not poll.
docs-examples:
status: exempt
comment: The integration extends core functionality and does not require examples.
docs-known-limitations:
status: exempt
comment: No known limitations.
docs-supported-devices:
status: exempt
comment: This integration does not support physical devices.
docs-supported-functions: done
docs-troubleshooting:
status: exempt
comment: There are no more detailed troubleshooting instructions available than what is already included in strings.json.
docs-use-cases: done
dynamic-devices:
status: exempt
comment: This integration does not have devices.
entity-category:
status: exempt
comment: This integration does not have entities.
entity-device-class:
status: exempt
comment: This integration does not have entities.
entity-disabled-by-default:
status: exempt
comment: This integration does not have entities.
entity-translations:
status: exempt
comment: This integration does not have entities.
exception-translations: done
icon-translations:
status: exempt
comment: This integration does not use icons.
reconfiguration-flow: todo
repair-issues:
status: exempt
comment: There are no issues which can be repaired.
stale-devices:
status: exempt
comment: This integration does not have devices.
# Platinum
async-dependency: done
inject-websession: todo
strict-typing: todo

View File

@@ -0,0 +1,56 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"cannot_connect": "[%key:component::idrive_e2::exceptions::cannot_connect::message%]",
"invalid_credentials": "[%key:component::idrive_e2::exceptions::invalid_credentials::message%]",
"invalid_endpoint_url": "[%key:component::idrive_e2::exceptions::invalid_endpoint_url::message%]"
},
"error": {
"bucket_not_found": "[%key:component::idrive_e2::exceptions::bucket_not_found::message%]",
"cannot_connect": "[%key:component::idrive_e2::exceptions::cannot_connect::message%]",
"invalid_credentials": "[%key:component::idrive_e2::exceptions::invalid_credentials::message%]",
"invalid_endpoint_url": "[%key:component::idrive_e2::exceptions::invalid_endpoint_url::message%]",
"no_buckets": "[%key:component::idrive_e2::exceptions::no_buckets::message%]"
},
"step": {
"bucket": {
"data": {
"bucket": "Bucket name"
},
"data_description": {
"bucket": "The name of the bucket to store the Home Assistant backups in."
},
"title": "Select IDrive e2 bucket"
},
"user": {
"data": {
"access_key_id": "Access key ID",
"secret_access_key": "Secret access key"
},
"data_description": {
"access_key_id": "Access key ID to connect to IDrive e2 API",
"secret_access_key": "Secret access key to connect to IDrive e2 API"
},
"title": "Add IDrive e2"
}
}
},
"exceptions": {
"bucket_not_found": {
"message": "Bucket \"{bucket}\" does not exist. Please recreate this bucket in IDrive e2 before continuing."
},
"cannot_connect": {
"message": "Cannot connect to endpoint"
},
"invalid_credentials": {
"message": "The provided access key ID and secret access key are invalid or do not have the required permissions."
},
"invalid_endpoint_url": {
"message": "Invalid endpoint URL. Please make sure it's a valid IDrive e2 endpoint URL."
},
"no_buckets": {
"message": "No buckets were found for the provided credentials. Create a bucket in IDrive e2 first, then try again."
}
}
}

View File

@@ -319,6 +319,7 @@ FLOWS = {
"ibeacon",
"icloud",
"idasen_desk",
"idrive_e2",
"ifttt",
"igloohome",
"imap",

View File

@@ -3009,6 +3009,12 @@
"iot_class": "cloud_polling",
"single_config_entry": true
},
"idrive_e2": {
"name": "IDrive e2",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_push"
},
"idteck_prox": {
"name": "IDTECK Proximity Reader",
"integration_type": "hub",

4
requirements_all.txt generated
View File

@@ -220,6 +220,7 @@ aiobafi6==0.9.0
# homeassistant.components.aws
# homeassistant.components.aws_s3
# homeassistant.components.cloudflare_r2
# homeassistant.components.idrive_e2
aiobotocore==2.21.1
# homeassistant.components.comelit
@@ -1278,6 +1279,9 @@ icmplib==3.0
# homeassistant.components.idasen_desk
idasen-ha==2.6.3
# homeassistant.components.idrive_e2
idrive-e2-client==0.1.1
# homeassistant.components.network
ifaddr==0.2.0

View File

@@ -211,6 +211,7 @@ aiobafi6==0.9.0
# homeassistant.components.aws
# homeassistant.components.aws_s3
# homeassistant.components.cloudflare_r2
# homeassistant.components.idrive_e2
aiobotocore==2.21.1
# homeassistant.components.comelit
@@ -1133,6 +1134,9 @@ icmplib==3.0
# homeassistant.components.idasen_desk
idasen-ha==2.6.3
# homeassistant.components.idrive_e2
idrive-e2-client==0.1.1
# homeassistant.components.network
ifaddr==0.2.0

View File

@@ -0,0 +1,15 @@
"""Tests for the IDrive e2 integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> bool:
"""Set up the IDrive e2 integration for testing."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
return result

View File

@@ -0,0 +1,88 @@
"""Common fixtures for the IDrive e2 tests."""
from __future__ import annotations
from collections.abc import AsyncIterator, Generator
import json
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.backup import AgentBackup
from homeassistant.components.idrive_e2 import CONF_BUCKET
from homeassistant.components.idrive_e2.backup import (
MULTIPART_MIN_PART_SIZE_BYTES,
suggested_filenames,
)
from homeassistant.components.idrive_e2.const import DOMAIN
from .const import USER_INPUT
from tests.common import MockConfigEntry
@pytest.fixture(
params=[2**20, MULTIPART_MIN_PART_SIZE_BYTES],
ids=["small", "large"],
)
def agent_backup(request: pytest.FixtureRequest) -> AgentBackup:
"""Test backup fixture."""
return AgentBackup(
addons=[],
backup_id="23e64aec",
date="2024-11-22T11:48:48.727189+01:00",
database_included=True,
extra_metadata={},
folders=[],
homeassistant_included=True,
homeassistant_version="2024.12.0.dev0",
name="Core 2024.12.0.dev0",
protected=False,
size=request.param,
)
@pytest.fixture(autouse=True)
def mock_client(agent_backup: AgentBackup) -> Generator[AsyncMock]:
"""Mock the IDrive e2 client."""
with patch(
"homeassistant.components.idrive_e2.AioSession.create_client",
autospec=True,
return_value=AsyncMock(),
) as create_client:
client = create_client.return_value
tar_file, metadata_file = suggested_filenames(agent_backup)
client.list_objects_v2.return_value = {
"Contents": [{"Key": tar_file}, {"Key": metadata_file}]
}
client.create_multipart_upload.return_value = {"UploadId": "upload_id"}
client.upload_part.return_value = {"ETag": "etag"}
client.list_buckets.return_value = {
"Buckets": [{"Name": USER_INPUT[CONF_BUCKET]}]
}
# To simplify this mock, we assume that backup is always "iterated" over, while metadata is always "read" as a whole
class MockStream:
async def iter_chunks(self) -> AsyncIterator[bytes]:
yield b"backup data"
async def read(self) -> bytes:
return json.dumps(agent_backup.as_dict()).encode()
client.get_object.return_value = {"Body": MockStream()}
client.head_bucket.return_value = {}
create_client.return_value.__aenter__.return_value = client
yield client
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
entry_id="test",
title="test",
domain=DOMAIN,
data=USER_INPUT,
)

View File

@@ -0,0 +1,15 @@
"""Consts for IDrive e2 tests."""
from homeassistant.components.idrive_e2.const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
)
USER_INPUT = {
CONF_ACCESS_KEY_ID: "TestTestTestTestTest",
CONF_SECRET_ACCESS_KEY: "TestTestTestTestTestTestTestTestTestTest",
CONF_ENDPOINT_URL: "https://c7h8.fra201.idrivee2-98.com",
CONF_BUCKET: "test",
}

View File

@@ -0,0 +1,528 @@
"""Test the IDrive e2 backup platform."""
from collections.abc import AsyncGenerator
from io import StringIO
import json
from time import time
from unittest.mock import AsyncMock, Mock, patch
from botocore.exceptions import ConnectTimeoutError
import pytest
from homeassistant.components.backup import DOMAIN as BACKUP_DOMAIN, AgentBackup
from homeassistant.components.idrive_e2.backup import (
MULTIPART_MIN_PART_SIZE_BYTES,
BotoCoreError,
IDriveE2BackupAgent,
async_register_backup_agents_listener,
suggested_filenames,
)
from homeassistant.components.idrive_e2.const import (
CONF_ENDPOINT_URL,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from . import setup_integration
from .const import USER_INPUT
from tests.common import MockConfigEntry
from tests.typing import ClientSessionGenerator, MagicMock, WebSocketGenerator
@pytest.fixture(autouse=True)
async def setup_backup_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> AsyncGenerator[None]:
"""Set up IDrive e2 integration."""
with (
patch("homeassistant.components.backup.is_hassio", return_value=False),
patch("homeassistant.components.backup.store.STORE_DELAY_SAVE", 0),
):
assert await async_setup_component(hass, BACKUP_DOMAIN, {})
await setup_integration(hass, mock_config_entry)
await hass.async_block_till_done()
yield
async def test_suggested_filenames() -> None:
"""Test the suggested_filenames function."""
backup = AgentBackup(
backup_id="a1b2c3",
date="2021-01-01T01:02:03+00:00",
addons=[],
database_included=False,
extra_metadata={},
folders=[],
homeassistant_included=False,
homeassistant_version=None,
name="my_pretty_backup",
protected=False,
size=0,
)
tar_filename, metadata_filename = suggested_filenames(backup)
assert tar_filename == "my_pretty_backup_2021-01-01_01.02_03000000.tar"
assert (
metadata_filename == "my_pretty_backup_2021-01-01_01.02_03000000.metadata.json"
)
async def test_agents_info(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test backup agent info."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "backup.local", "name": "local"},
{
"agent_id": f"{DOMAIN}.{mock_config_entry.entry_id}",
"name": mock_config_entry.title,
},
],
}
async def test_agents_list_backups(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
agent_backup: AgentBackup,
) -> None:
"""Test agent list backups."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backups"] == [
{
"addons": agent_backup.addons,
"agents": {
f"{DOMAIN}.{mock_config_entry.entry_id}": {
"protected": agent_backup.protected,
"size": agent_backup.size,
}
},
"backup_id": agent_backup.backup_id,
"database_included": agent_backup.database_included,
"date": agent_backup.date,
"extra_metadata": agent_backup.extra_metadata,
"failed_addons": [],
"failed_agent_ids": [],
"failed_folders": [],
"folders": agent_backup.folders,
"homeassistant_included": agent_backup.homeassistant_included,
"homeassistant_version": agent_backup.homeassistant_version,
"name": agent_backup.name,
"with_automatic_settings": None,
}
]
async def test_agents_get_backup(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
agent_backup: AgentBackup,
) -> None:
"""Test agent get backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": agent_backup.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backup"] == {
"addons": agent_backup.addons,
"agents": {
f"{DOMAIN}.{mock_config_entry.entry_id}": {
"protected": agent_backup.protected,
"size": agent_backup.size,
}
},
"backup_id": agent_backup.backup_id,
"database_included": agent_backup.database_included,
"date": agent_backup.date,
"extra_metadata": agent_backup.extra_metadata,
"failed_addons": [],
"failed_agent_ids": [],
"failed_folders": [],
"folders": agent_backup.folders,
"homeassistant_included": agent_backup.homeassistant_included,
"homeassistant_version": agent_backup.homeassistant_version,
"name": agent_backup.name,
"with_automatic_settings": None,
}
async def test_agents_get_backup_does_not_throw_on_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent get backup does not throw on a backup not found."""
mock_client.list_objects_v2.return_value = {"Contents": []}
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/details", "backup_id": "random"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backup"] is None
async def test_agents_list_backups_with_corrupted_metadata(
hass: HomeAssistant,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
caplog: pytest.LogCaptureFixture,
agent_backup: AgentBackup,
) -> None:
"""Test listing backups when one metadata file is corrupted."""
# Create agent
agent = IDriveE2BackupAgent(hass, mock_config_entry)
# Set up mock responses for both valid and corrupted metadata files
mock_client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "valid_backup.metadata.json",
"LastModified": "2023-01-01T00:00:00+00:00",
},
{
"Key": "corrupted_backup.metadata.json",
"LastModified": "2023-01-01T00:00:00+00:00",
},
]
}
# Mock responses for get_object calls
valid_metadata = json.dumps(agent_backup.as_dict())
corrupted_metadata = "{invalid json content"
async def mock_get_object(**kwargs):
"""Mock get_object with different responses based on the key."""
key = kwargs.get("Key", "")
if "valid_backup" in key:
mock_body = AsyncMock()
mock_body.read.return_value = valid_metadata.encode()
return {"Body": mock_body}
# Corrupted metadata
mock_body = AsyncMock()
mock_body.read.return_value = corrupted_metadata.encode()
return {"Body": mock_body}
mock_client.get_object.side_effect = mock_get_object
backups = await agent.async_list_backups()
assert len(backups) == 1
assert backups[0].backup_id == agent_backup.backup_id
assert "Failed to process metadata file" in caplog.text
async def test_agents_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent delete backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": "23e64aec",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
# Should delete both the tar and the metadata file
assert mock_client.delete_objects.call_count == 1
kwargs = mock_client.delete_objects.call_args.kwargs
assert "Delete" in kwargs and "Objects" in kwargs["Delete"]
assert len(kwargs["Delete"]["Objects"]) == 2
async def test_agents_delete_not_throwing_on_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent delete backup does not throw on a backup not found."""
mock_client.list_objects_v2.return_value = {"Contents": []}
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": "random",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
assert mock_client.delete_object.call_count == 0
async def test_agents_upload(
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
agent_backup: AgentBackup,
) -> None:
"""Test agent upload backup."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=agent_backup,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=agent_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
# we must emit at least two chunks
# the "appendix" chunk triggers the upload of the final buffer part
mocked_open.return_value.read = Mock(
side_effect=[
b"a" * agent_backup.size,
b"appendix",
b"",
]
)
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert f"Uploading backup {agent_backup.backup_id}" in caplog.text
if agent_backup.size < MULTIPART_MIN_PART_SIZE_BYTES:
# single part + metadata both as regular upload (no multiparts)
assert mock_client.create_multipart_upload.await_count == 0
assert mock_client.put_object.await_count == 2
else:
assert "Uploading final part" in caplog.text
# 2 parts as multipart + metadata as regular upload
assert mock_client.create_multipart_upload.await_count == 1
assert mock_client.upload_part.await_count == 2
assert mock_client.complete_multipart_upload.await_count == 1
assert mock_client.put_object.await_count == 1
async def test_agents_upload_network_failure(
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
agent_backup: AgentBackup,
) -> None:
"""Test agent upload backup with network failure."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=agent_backup,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=agent_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
# simulate network failure
mock_client.put_object.side_effect = mock_client.upload_part.side_effect = (
mock_client.abort_multipart_upload.side_effect
) = ConnectTimeoutError(endpoint_url=USER_INPUT[CONF_ENDPOINT_URL])
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert "Upload failed for idrive_e2" in caplog.text
async def test_multipart_upload_consistent_part_sizes(
hass: HomeAssistant,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test that multipart upload uses consistent part sizes.
Defensive implementation to ensure that even if the input stream yields
varying chunk sizes, the multipart upload will still create parts of the
same correct size. This test verifies that varying chunk sizes still
result in consistent part sizes.
"""
agent = IDriveE2BackupAgent(hass, mock_config_entry)
# simulate varying chunk data sizes
# total data: 12 + 12 + 10 + 12 + 5 = 51 MiB
chunk_sizes = [12, 12, 10, 12, 5] # in units of 1 MiB
mib = 2**20
async def mock_stream():
for size in chunk_sizes:
yield b"x" * (size * mib)
async def open_stream():
return mock_stream()
# Record the sizes of each uploaded part
uploaded_part_sizes: list[int] = []
async def record_upload_part(**kwargs):
body = kwargs.get("Body", b"")
uploaded_part_sizes.append(len(body))
return {"ETag": f"etag-{len(uploaded_part_sizes)}"}
mock_client.upload_part.side_effect = record_upload_part
await agent._upload_multipart("test.tar", open_stream)
# Verify that all non-trailing parts have the same size
assert len(uploaded_part_sizes) >= 2, "Expected at least 2 parts"
non_trailing_parts = uploaded_part_sizes[:-1]
assert all(size == MULTIPART_MIN_PART_SIZE_BYTES for size in non_trailing_parts), (
f"All non-trailing parts should be {MULTIPART_MIN_PART_SIZE_BYTES} bytes, got {non_trailing_parts}"
)
# Verify the trailing part contains the remainder
total_data = sum(chunk_sizes) * mib
expected_trailing = total_data % MULTIPART_MIN_PART_SIZE_BYTES
if expected_trailing == 0:
expected_trailing = MULTIPART_MIN_PART_SIZE_BYTES
assert uploaded_part_sizes[-1] == expected_trailing
async def test_agents_download(
hass_client: ClientSessionGenerator,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test agent download backup."""
client = await hass_client()
backup_id = "23e64aec"
resp = await client.get(
f"/api/backup/download/{backup_id}?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 200
assert await resp.content.read() == b"backup data"
assert mock_client.get_object.call_count == 2 # One for metadata, one for tar file
async def test_error_during_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
agent_backup: AgentBackup,
) -> None:
"""Test the error wrapper."""
mock_client.delete_objects.side_effect = BotoCoreError
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": agent_backup.backup_id,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {
f"{DOMAIN}.{mock_config_entry.entry_id}": "Failed during async_delete_backup"
}
}
async def test_cache_expiration(
hass: HomeAssistant,
mock_client: MagicMock,
agent_backup: AgentBackup,
) -> None:
"""Test that the cache expires correctly."""
# Mock the entry
mock_entry = MockConfigEntry(
domain=DOMAIN,
data={"bucket": "test-bucket"},
unique_id="test-unique-id",
title="Test IDrive e2",
)
mock_entry.runtime_data = mock_client
# Create agent
agent = IDriveE2BackupAgent(hass, mock_entry)
# Mock metadata response
metadata_content = json.dumps(agent_backup.as_dict())
mock_body = AsyncMock()
mock_body.read.return_value = metadata_content.encode()
mock_client.list_objects_v2.return_value = {
"Contents": [
{"Key": "test.metadata.json", "LastModified": "2023-01-01T00:00:00+00:00"}
]
}
# First call should query IDrive e2
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 1
assert mock_client.get_object.call_count == 1
# Second call should use cache
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 1
assert mock_client.get_object.call_count == 1
# Set cache to expire
agent._cache_expiration = time() - 1
# Third call should query IDrive e2 again
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 2
assert mock_client.get_object.call_count == 2
async def test_listeners_get_cleaned_up(hass: HomeAssistant) -> None:
"""Test listener gets cleaned up."""
listener = MagicMock()
remove_listener = async_register_backup_agents_listener(hass, listener=listener)
hass.data[DATA_BACKUP_AGENT_LISTENERS] = [
listener
] # make sure it's the last listener
remove_listener()
assert DATA_BACKUP_AGENT_LISTENERS not in hass.data

View File

@@ -0,0 +1,336 @@
"""Test the IDrive e2 config flow."""
from __future__ import annotations
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
from botocore.exceptions import EndpointConnectionError
from idrive_e2 import CannotConnect, InvalidAuth
import pytest
import voluptuous as vol
from homeassistant.components.idrive_e2 import ClientError
from homeassistant.components.idrive_e2.config_flow import CONF_ACCESS_KEY_ID
from homeassistant.components.idrive_e2.const import (
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.selector import SelectSelector
from .const import USER_INPUT
from tests.common import MockConfigEntry
@pytest.fixture
def mock_idrive_client() -> Generator[AsyncMock]:
"""Patch IDriveE2Client to return a mocked client."""
mock_client = AsyncMock()
mock_client.get_region_endpoint.return_value = USER_INPUT[CONF_ENDPOINT_URL]
with patch(
"homeassistant.components.idrive_e2.config_flow.IDriveE2Client",
return_value=mock_client,
):
yield mock_client
async def test_flow(
hass: HomeAssistant,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
) -> None:
"""Test config flow success path."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_BUCKET: USER_INPUT[CONF_BUCKET]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
@pytest.mark.parametrize(
("exception", "errors"),
[
(
ClientError(
{"Error": {"Code": "403", "Message": "Forbidden"}}, "list_buckets"
),
{"base": "invalid_credentials"},
),
(ValueError(), {"base": "invalid_endpoint_url"}),
(
EndpointConnectionError(endpoint_url="http://example.com"),
{"base": "cannot_connect"},
),
],
)
async def test_flow_list_buckets_errors(
hass: HomeAssistant,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
exception: Exception,
errors: dict[str, str],
) -> None:
"""Test errors when listing buckets."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
flow_id = result["flow_id"]
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# First attempt: fail
mock_client.list_buckets.side_effect = exception
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == errors
# Second attempt: fix and finish to CREATE_ENTRY
mock_client.list_buckets.side_effect = None
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
result = await hass.config_entries.flow.async_configure(
flow_id,
{CONF_BUCKET: USER_INPUT[CONF_BUCKET]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
async def test_flow_no_buckets(
hass: HomeAssistant,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
) -> None:
"""Test we show an error when no buckets are returned."""
# Start flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
flow_id = result["flow_id"]
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# First attempt: empty bucket list -> error
mock_client.list_buckets.return_value = {"Buckets": []}
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "no_buckets"}
# Second attempt: fix and finish to CREATE_ENTRY
mock_client.list_buckets.return_value = {
"Buckets": [{"Name": USER_INPUT[CONF_BUCKET]}]
}
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
result = await hass.config_entries.flow.async_configure(
flow_id,
{CONF_BUCKET: USER_INPUT[CONF_BUCKET]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
async def test_flow_bucket_step_options_from_s3_list_buckets(
hass: HomeAssistant,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
) -> None:
"""Test bucket step shows dropdown options coming from S3 list_buckets()."""
# Start flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
flow_id = result["flow_id"]
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# S3 list_buckets returns our test payload
mock_client.list_buckets.return_value = {
"Buckets": [{"Name": "bucket1"}, {"Name": "bucket2"}]
}
# Submit credentials
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
# Extract dropdown options from selector in schema
schema = result["data_schema"].schema
selector = schema[vol.Required(CONF_BUCKET)]
assert isinstance(selector, SelectSelector)
cfg = selector.config
options = cfg["options"] if isinstance(cfg, dict) else cfg.options
assert options == ["bucket1", "bucket2"]
# Continue to finish to CREATE_ENTRY
result = await hass.config_entries.flow.async_configure(
flow_id,
{CONF_BUCKET: "bucket1"},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "bucket1"
assert result["data"][CONF_BUCKET] == "bucket1"
@pytest.mark.parametrize(
("exception", "expected_error"),
[
(InvalidAuth("Invalid credentials"), "invalid_credentials"),
(CannotConnect("cannot connect"), "cannot_connect"),
],
)
async def test_flow_get_region_endpoint_error(
hass: HomeAssistant,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
exception: Exception,
expected_error: str,
) -> None:
"""Test user step error mapping when resolving region endpoint via client."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
flow_id = result["flow_id"]
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# First attempt: fail endpoint resolution
mock_idrive_client.get_region_endpoint.side_effect = exception
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": expected_error}
# Second attempt: fix and finish to CREATE_ENTRY
mock_idrive_client.get_region_endpoint.side_effect = None
mock_idrive_client.get_region_endpoint.return_value = USER_INPUT[CONF_ENDPOINT_URL]
result = await hass.config_entries.flow.async_configure(
flow_id,
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
result = await hass.config_entries.flow.async_configure(
flow_id,
{CONF_BUCKET: USER_INPUT[CONF_BUCKET]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == USER_INPUT
async def test_abort_if_already_configured(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_idrive_client: AsyncMock,
mock_client: AsyncMock,
) -> None:
"""Test we abort if the account is already configured."""
# Existing entry that should cause abort when selecting the same bucket + endpoint
MockConfigEntry(
domain=mock_config_entry.domain,
title=mock_config_entry.title,
data={
**mock_config_entry.data,
CONF_BUCKET: USER_INPUT[CONF_BUCKET],
CONF_ENDPOINT_URL: USER_INPUT[CONF_ENDPOINT_URL],
},
unique_id="existing",
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_ACCESS_KEY_ID: USER_INPUT[CONF_ACCESS_KEY_ID],
CONF_SECRET_ACCESS_KEY: USER_INPUT[CONF_SECRET_ACCESS_KEY],
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "bucket"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_BUCKET: USER_INPUT[CONF_BUCKET]},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"

View File

@@ -0,0 +1,108 @@
"""Test the IDrive e2 storage integration."""
from unittest.mock import AsyncMock, patch
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
ParamValidationError,
)
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_async_setup_entry_does_not_mask_when_close_fails(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_client: AsyncMock,
) -> None:
"""Test close failures do not mask the original setup exception."""
mock_config_entry.add_to_hass(hass)
# Force setup to fail after the client has been created
mock_client.head_bucket.side_effect = ClientError(
{"Error": {"Code": "403", "Message": "Forbidden"}}, "HeadBucket"
)
# Also force close() to fail
mock_client.close.side_effect = RuntimeError("boom")
assert await hass.config_entries.async_setup(mock_config_entry.entry_id) is False
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
mock_client.close.assert_awaited_once()
async def test_load_unload_config_entry(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test loading and unloading the integration."""
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.LOADED
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.NOT_LOADED
@pytest.mark.parametrize(
("exception", "state"),
[
(
ParamValidationError(report="Invalid bucket name"),
ConfigEntryState.SETUP_ERROR,
),
(ValueError(), ConfigEntryState.SETUP_ERROR),
(
EndpointConnectionError(endpoint_url="https://example.com"),
ConfigEntryState.SETUP_RETRY,
),
],
)
async def test_setup_entry_create_client_errors(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
exception: Exception,
state: ConfigEntryState,
) -> None:
"""Test various setup errors."""
with patch(
"homeassistant.components.idrive_e2.AioSession.create_client",
side_effect=exception,
):
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is state
@pytest.mark.parametrize(
("error_response"),
[
{"Error": {"Code": "InvalidAccessKeyId"}},
{"Error": {"Code": "404", "Message": "Not Found"}},
],
ids=["invalid_access_key", "bucket_not_found"],
)
async def test_setup_entry_head_bucket_errors(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_client: AsyncMock,
error_response: dict,
) -> None:
"""Test setup_entry errors when calling head_bucket."""
mock_client.head_bucket.side_effect = ClientError(
error_response=error_response,
operation_name="head_bucket",
)
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR