1
0
mirror of https://github.com/home-assistant/core.git synced 2026-05-30 12:14:20 +01:00
Files
2026-05-14 07:17:37 +02:00

200 lines
7.0 KiB
Python

"""An abstract class common to all Bond entities."""
from abc import abstractmethod
from asyncio import Lock
from datetime import datetime
import logging
from aiohttp import ClientError
from homeassistant.const import (
ATTR_HW_VERSION,
ATTR_MODEL,
ATTR_NAME,
ATTR_SUGGESTED_AREA,
ATTR_SW_VERSION,
ATTR_VIA_DEVICE,
)
from homeassistant.core import CALLBACK_TYPE, HassJob, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_call_later
from .const import DOMAIN
from .models import BondData
from .utils import BondDevice
_LOGGER = logging.getLogger(__name__)
_FALLBACK_SCAN_INTERVAL = 10
_BPUP_ALIVE_SCAN_INTERVAL = 60
class BondEntity(Entity):
"""Generic Bond entity encapsulating common features of any Bond device."""
_attr_should_poll = False
def __init__(
self,
data: BondData,
device: BondDevice,
sub_device: str | None = None,
sub_device_id: str | None = None,
) -> None:
"""Initialize entity with API and device info."""
hub = data.hub
self._hub = hub
self._bond = hub.bond
self._device = device
self._device_id = device.device_id
self._sub_device = sub_device
self._attr_available = True
self._bpup_subs = data.bpup_subs
self._update_lock = Lock()
self._initialized = False
if sub_device_id:
sub_device_id = f"_{sub_device_id}"
elif sub_device:
sub_device_id = f"_{sub_device}"
else:
sub_device_id = ""
self._attr_unique_id = f"{hub.bond_id}_{device.device_id}{sub_device_id}"
if sub_device:
sub_device_name = sub_device.replace("_", " ").title()
self._attr_name = f"{device.name} {sub_device_name}"
else:
self._attr_name = device.name
self._attr_assumed_state = self._hub.is_bridge and not self._device.trust_state
self._apply_state()
self._bpup_polling_fallback: CALLBACK_TYPE | None = None
self._async_update_if_bpup_not_alive_job = HassJob(
self._async_update_if_bpup_not_alive
)
@property
def device_info(self) -> DeviceInfo:
"""Get a an HA device representing this Bond controlled device."""
device_info = DeviceInfo(
manufacturer=self._hub.make,
# type ignore: tuple items should not be Optional
identifiers={(DOMAIN, self._hub.bond_id, self._device_id)}, # type: ignore[arg-type]
configuration_url=f"http://{self._hub.host}",
)
if self.name is not None:
device_info[ATTR_NAME] = self._device.name
if self._hub.bond_id is not None:
device_info[ATTR_VIA_DEVICE] = (DOMAIN, self._hub.bond_id)
if self._device.location is not None:
device_info[ATTR_SUGGESTED_AREA] = self._device.location
if not self._hub.is_bridge:
if self._hub.model is not None:
device_info[ATTR_MODEL] = self._hub.model
if self._hub.fw_ver is not None:
device_info[ATTR_SW_VERSION] = self._hub.fw_ver
if self._hub.mcu_ver is not None:
device_info[ATTR_HW_VERSION] = self._hub.mcu_ver
else:
model_data = []
if self._device.branding_profile:
model_data.append(self._device.branding_profile)
if self._device.template:
model_data.append(self._device.template)
if model_data:
device_info[ATTR_MODEL] = " ".join(model_data)
return device_info
async def async_update(self) -> None:
"""Perform a manual update from API."""
await self._async_update_from_api()
@callback
def _async_update_if_bpup_not_alive(self, now: datetime) -> None:
"""Fetch via the API if BPUP is not alive."""
self._async_schedule_bpup_alive_or_poll()
if self.hass.is_stopping or (
self._bpup_subs.alive and self._initialized and self.available
):
return
if self._update_lock.locked():
_LOGGER.warning(
"Updating %s took longer than the scheduled update interval %s",
self.entity_id,
_FALLBACK_SCAN_INTERVAL,
)
return
self.hass.async_create_background_task(
self._async_update(), f"{DOMAIN} {self.name} update", eager_start=True
)
async def _async_update(self) -> None:
"""Fetch via the API."""
async with self._update_lock:
await self._async_update_from_api()
self.async_write_ha_state()
async def _async_update_from_api(self) -> None:
"""Fetch via the API."""
try:
state: dict = await self._bond.device_state(self._device_id)
except (ClientError, TimeoutError, OSError) as error:
if self.available:
_LOGGER.warning(
"Entity %s has become unavailable", self.entity_id, exc_info=error
)
self._attr_available = False
else:
self._async_state_callback(state)
@abstractmethod
def _apply_state(self) -> None:
raise NotImplementedError
@callback
def _async_state_callback(self, state: dict) -> None:
"""Process a state change."""
self._initialized = True
if not self.available:
_LOGGER.info("Entity %s has come back", self.entity_id)
self._attr_available = True
_LOGGER.debug(
"Device state for %s (%s) is:\n%s", self.name, self.entity_id, state
)
self._device.state = state
self._apply_state()
@callback
def _async_bpup_callback(self, json_msg: dict) -> None:
"""Process a state change from BPUP."""
topic = json_msg["t"]
if topic != f"devices/{self._device_id}/state":
return
self._async_state_callback(json_msg["b"])
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Subscribe to BPUP and start polling."""
await super().async_added_to_hass()
self._bpup_subs.subscribe(self._device_id, self._async_bpup_callback)
self._async_schedule_bpup_alive_or_poll()
@callback
def _async_schedule_bpup_alive_or_poll(self) -> None:
"""Schedule the BPUP alive or poll."""
alive = self._bpup_subs.alive
self._bpup_polling_fallback = async_call_later(
self.hass,
_BPUP_ALIVE_SCAN_INTERVAL if alive else _FALLBACK_SCAN_INTERVAL,
self._async_update_if_bpup_not_alive_job,
)
async def async_will_remove_from_hass(self) -> None:
"""Unsubscribe from BPUP data on remove."""
await super().async_will_remove_from_hass()
self._bpup_subs.unsubscribe(self._device_id, self._async_bpup_callback)
if self._bpup_polling_fallback:
self._bpup_polling_fallback()
self._bpup_polling_fallback = None