1
0
mirror of https://github.com/home-assistant/core.git synced 2026-05-31 12:44:04 +01:00
Files
2026-05-26 21:40:54 +02:00

45 lines
1.4 KiB
Python

"""Utility functions for Aqualink devices."""
from collections.abc import Awaitable
import httpx
from iaqualink.exception import (
AqualinkServiceException,
AqualinkServiceUnauthorizedException,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
def error_detail(err: Exception) -> str:
"""Return a non-empty error detail for iaqualink exceptions."""
if detail := str(err):
return detail
return type(err).__name__
async def await_or_reraise(
hass: HomeAssistant,
config_entry: ConfigEntry | None,
awaitable: Awaitable,
) -> None:
"""Execute API call while catching service exceptions."""
try:
await awaitable
except AqualinkServiceUnauthorizedException as auth_exception:
if config_entry is not None:
config_entry.async_start_reauth(hass)
raise ConfigEntryAuthFailed(
"Invalid credentials for iAquaLink"
) from auth_exception
except TimeoutError as timeout_exception:
raise HomeAssistantError(
f"Aqualink error: {error_detail(timeout_exception)}"
) from timeout_exception
except (AqualinkServiceException, httpx.HTTPError) as svc_exception:
raise HomeAssistantError(
f"Aqualink error: {error_detail(svc_exception)}"
) from svc_exception