1
0
mirror of https://github.com/home-assistant/core.git synced 2026-05-20 15:30:26 +01:00
Files

256 lines
9.2 KiB
Python

"""Sensor platform for the Duco integration."""
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
import logging
from duco_connectivity.models import Node, NodeType, VentilationState
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import (
CONCENTRATION_PARTS_PER_MILLION,
PERCENTAGE,
SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
EntityCategory,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util import dt as dt_util
from .const import DOMAIN
from .coordinator import DucoConfigEntry, DucoCoordinator
from .entity import DucoEntity
_LOGGER = logging.getLogger(__name__)
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class DucoSensorEntityDescription(SensorEntityDescription):
"""Duco sensor entity description."""
value_fn: Callable[[Node], datetime | int | float | str | None]
node_types: tuple[NodeType, ...]
@dataclass(frozen=True, kw_only=True)
class DucoBoxSensorEntityDescription(SensorEntityDescription):
"""Duco sensor entity description for box-level diagnostic data."""
value_fn: Callable[[DucoCoordinator], int | float | None]
SENSOR_DESCRIPTIONS: tuple[DucoSensorEntityDescription, ...] = (
DucoSensorEntityDescription(
key="ventilation_state",
translation_key="ventilation_state",
device_class=SensorDeviceClass.ENUM,
options=[
state.lower()
for state in VentilationState
if state != VentilationState.UNKNOWN
],
value_fn=lambda node: (
node.ventilation.state.lower()
if node.ventilation and node.ventilation.state != VentilationState.UNKNOWN
else None
),
node_types=(NodeType.BOX,),
),
DucoSensorEntityDescription(
key="target_flow_level",
translation_key="target_flow_level",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
suggested_display_precision=0,
value_fn=lambda node: (
node.ventilation.flow_lvl_tgt if node.ventilation else None
),
node_types=(NodeType.BOX,),
),
DucoSensorEntityDescription(
key="time_state_end",
translation_key="time_state_end",
device_class=SensorDeviceClass.TIMESTAMP,
value_fn=lambda node: (
dt_util.utc_from_timestamp(node.ventilation.time_state_end).replace(
second=0, microsecond=0
)
if node.ventilation and node.ventilation.time_state_end != 0
else None
),
node_types=(NodeType.BOX,),
),
DucoSensorEntityDescription(
key="co2",
device_class=SensorDeviceClass.CO2,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=CONCENTRATION_PARTS_PER_MILLION,
value_fn=lambda node: node.sensor.co2 if node.sensor else None,
node_types=(NodeType.UCCO2, NodeType.VLVCO2, NodeType.VLVCO2RH),
),
DucoSensorEntityDescription(
key="iaq_co2",
translation_key="iaq_co2",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
entity_registry_enabled_default=False,
value_fn=lambda node: node.sensor.iaq_co2 if node.sensor else None,
node_types=(NodeType.UCCO2, NodeType.VLVCO2, NodeType.VLVCO2RH),
),
DucoSensorEntityDescription(
key="humidity",
device_class=SensorDeviceClass.HUMIDITY,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
value_fn=lambda node: node.sensor.rh if node.sensor else None,
node_types=(NodeType.BSRH, NodeType.UCRH, NodeType.VLVRH, NodeType.VLVCO2RH),
),
DucoSensorEntityDescription(
key="iaq_rh",
translation_key="iaq_rh",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
entity_registry_enabled_default=False,
value_fn=lambda node: node.sensor.iaq_rh if node.sensor else None,
node_types=(NodeType.BSRH, NodeType.UCRH, NodeType.VLVRH, NodeType.VLVCO2RH),
),
)
BOX_SENSOR_DESCRIPTIONS: tuple[DucoBoxSensorEntityDescription, ...] = (
DucoBoxSensorEntityDescription(
key="rssi_wifi",
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
value_fn=lambda coordinator: coordinator.data.rssi_wifi,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: DucoConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Duco sensor entities."""
coordinator = entry.runtime_data
# Track the node IDs for which entities have already been created, so we
# can detect both newly added and stale (deregistered) nodes on every
# coordinator update.
known_nodes: set[int] = set()
@callback
def _async_add_new_entities() -> None:
"""Add new sensor entities and remove stale ones on coordinator updates."""
# Remove devices whose nodes have disappeared from the API.
# The firmware removes deregistered RF/wired nodes automatically.
# BSRH box sensors that are physically unplugged from the PCB are
# not deregistered by the firmware and will never appear here as stale.
stale_node_ids = known_nodes - coordinator.data.nodes.keys()
if stale_node_ids:
device_reg = dr.async_get(hass)
mac = entry.unique_id
for node_id in stale_node_ids:
device = device_reg.async_get_device(
identifiers={(DOMAIN, f"{mac}_{node_id}")}
)
if device:
device_reg.async_update_device(
device.id,
remove_config_entry_id=entry.entry_id,
)
known_nodes.difference_update(stale_node_ids)
new_entities: list[SensorEntity] = []
for node in coordinator.data.nodes.values():
if node.node_id in known_nodes:
continue
if node.general.node_type == NodeType.UNKNOWN:
# Do not add the node to known_nodes so that it is re-evaluated
# on every coordinator update. This allows entities to be
# created automatically once a firmware update or library
# update adds support for the device type.
_LOGGER.debug(
"Duco node %s (%s) has an unsupported device type and will be "
"retried on subsequent coordinator updates",
node.node_id,
node.general.name,
)
continue
known_nodes.add(node.node_id)
new_entities.extend(
DucoSensorEntity(coordinator, node, description)
for description in SENSOR_DESCRIPTIONS
if node.general.node_type in description.node_types
)
new_entities.extend(
DucoBoxSensorEntity(coordinator, node, description)
for description in BOX_SENSOR_DESCRIPTIONS
if node.general.node_type == NodeType.BOX
)
if new_entities:
async_add_entities(new_entities)
entry.async_on_unload(coordinator.async_add_listener(_async_add_new_entities))
_async_add_new_entities()
class DucoSensorEntity(DucoEntity, SensorEntity):
"""Sensor entity for a Duco node."""
entity_description: DucoSensorEntityDescription
def __init__(
self,
coordinator: DucoCoordinator,
node: Node,
description: DucoSensorEntityDescription,
) -> None:
"""Initialize the sensor entity."""
super().__init__(coordinator, node)
self.entity_description = description
self._attr_unique_id = (
f"{coordinator.config_entry.unique_id}_{node.node_id}_{description.key}"
)
@property
def native_value(self) -> datetime | int | float | str | None:
"""Return the sensor value."""
return self.entity_description.value_fn(self._node)
class DucoBoxSensorEntity(DucoEntity, SensorEntity):
"""Sensor entity for box-level diagnostic data."""
entity_description: DucoBoxSensorEntityDescription
def __init__(
self,
coordinator: DucoCoordinator,
node: Node,
description: DucoBoxSensorEntityDescription,
) -> None:
"""Initialize the box sensor entity."""
super().__init__(coordinator, node)
self.entity_description = description
self._attr_unique_id = (
f"{coordinator.config_entry.unique_id}_{node.node_id}_{description.key}"
)
@property
def native_value(self) -> int | float | None:
"""Return the sensor value."""
return self.entity_description.value_fn(self.coordinator)