1
0
mirror of https://github.com/home-assistant/core.git synced 2026-05-30 04:05:01 +01:00
Files
core/homeassistant/components/trend/binary_sensor.py
2026-04-30 21:14:48 +02:00

294 lines
9.6 KiB
Python

"""A sensor that monitors trends in other components."""
from collections import deque
from collections.abc import Mapping
import logging
import math
from typing import Any
import numpy as np
import voluptuous as vol
from homeassistant.components.binary_sensor import (
DEVICE_CLASSES_SCHEMA,
ENTITY_ID_FORMAT,
PLATFORM_SCHEMA as BINARY_SENSOR_PLATFORM_SCHEMA,
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_ATTRIBUTE,
CONF_DEVICE_CLASS,
CONF_ENTITY_ID,
CONF_FRIENDLY_NAME,
CONF_SENSORS,
CONF_UNIQUE_ID,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, EventStateChangedData, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device import async_entity_id_to_device
from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
AddEntitiesCallback,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util.dt import utcnow
from . import PLATFORMS
from .const import (
ATTR_GRADIENT,
ATTR_INVERT,
ATTR_MIN_GRADIENT,
ATTR_SAMPLE_COUNT,
ATTR_SAMPLE_DURATION,
CONF_INVERT,
CONF_MAX_SAMPLES,
CONF_MIN_GRADIENT,
CONF_MIN_SAMPLES,
CONF_SAMPLE_DURATION,
DEFAULT_MAX_SAMPLES,
DEFAULT_MIN_GRADIENT,
DEFAULT_MIN_SAMPLES,
DEFAULT_SAMPLE_DURATION,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
def _validate_min_max(data: dict[str, Any]) -> dict[str, Any]:
if (
CONF_MIN_SAMPLES in data
and CONF_MAX_SAMPLES in data
and data[CONF_MAX_SAMPLES] < data[CONF_MIN_SAMPLES]
):
raise vol.Invalid("min_samples must be smaller than or equal to max_samples")
return data
SENSOR_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Optional(CONF_ATTRIBUTE): cv.string,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_FRIENDLY_NAME): cv.string,
vol.Optional(CONF_INVERT, default=False): cv.boolean,
vol.Optional(CONF_MAX_SAMPLES, default=2): cv.positive_int,
vol.Optional(CONF_MIN_GRADIENT, default=0.0): vol.Coerce(float),
vol.Optional(CONF_SAMPLE_DURATION, default=0): cv.positive_int,
vol.Optional(CONF_MIN_SAMPLES, default=2): cv.positive_int,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
),
_validate_min_max,
)
PLATFORM_SCHEMA = BINARY_SENSOR_PLATFORM_SCHEMA.extend(
{vol.Required(CONF_SENSORS): cv.schema_with_slug_keys(SENSOR_SCHEMA)}
)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the trend sensors."""
await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
entities = []
for sensor_name, sensor_config in config[CONF_SENSORS].items():
entities.append(
SensorTrend(
hass,
name=sensor_config.get(CONF_FRIENDLY_NAME, sensor_name),
entity_id=sensor_config[CONF_ENTITY_ID],
attribute=sensor_config.get(CONF_ATTRIBUTE),
invert=sensor_config[CONF_INVERT],
sample_duration=sensor_config[CONF_SAMPLE_DURATION],
min_gradient=sensor_config[CONF_MIN_GRADIENT],
min_samples=sensor_config[CONF_MIN_SAMPLES],
max_samples=sensor_config[CONF_MAX_SAMPLES],
device_class=sensor_config.get(CONF_DEVICE_CLASS),
unique_id=sensor_config.get(CONF_UNIQUE_ID),
sensor_entity_id=generate_entity_id(
ENTITY_ID_FORMAT, sensor_name, hass=hass
),
)
)
async_add_entities(entities)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up trend sensor from config entry."""
async_add_entities(
[
SensorTrend(
hass,
name=entry.title,
entity_id=entry.options[CONF_ENTITY_ID],
attribute=entry.options.get(CONF_ATTRIBUTE),
invert=entry.options[CONF_INVERT],
sample_duration=entry.options.get(
CONF_SAMPLE_DURATION, DEFAULT_SAMPLE_DURATION
),
min_gradient=entry.options.get(CONF_MIN_GRADIENT, DEFAULT_MIN_GRADIENT),
min_samples=entry.options.get(CONF_MIN_SAMPLES, DEFAULT_MIN_SAMPLES),
max_samples=entry.options.get(CONF_MAX_SAMPLES, DEFAULT_MAX_SAMPLES),
unique_id=entry.entry_id,
)
]
)
class SensorTrend(BinarySensorEntity, RestoreEntity):
"""Representation of a trend Sensor."""
_attr_should_poll = False
_gradient = 0.0
_state: bool | None = None
def __init__(
self,
hass: HomeAssistant,
*,
name: str,
entity_id: str,
attribute: str | None,
invert: bool,
sample_duration: int,
min_gradient: float,
min_samples: int,
max_samples: int,
unique_id: str | None = None,
device_class: BinarySensorDeviceClass | None = None,
sensor_entity_id: str | None = None,
) -> None:
"""Initialize the sensor."""
self._entity_id = entity_id
self._attribute = attribute
self._invert = invert
self._sample_duration = sample_duration
self._min_gradient = min_gradient
self._min_samples = min_samples
self.samples: deque = deque(maxlen=int(max_samples))
self._attr_name = name
self._attr_device_class = device_class
self._attr_unique_id = unique_id
self.device_entry = async_entity_id_to_device(
hass,
entity_id,
)
if sensor_entity_id:
self.entity_id = sensor_entity_id
@property
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes of the sensor."""
return {
ATTR_ENTITY_ID: self._entity_id,
ATTR_GRADIENT: self._gradient,
ATTR_INVERT: self._invert,
ATTR_MIN_GRADIENT: self._min_gradient,
ATTR_SAMPLE_COUNT: len(self.samples),
ATTR_SAMPLE_DURATION: self._sample_duration,
}
async def async_added_to_hass(self) -> None:
"""Complete device setup after being added to hass."""
@callback
def trend_sensor_state_listener(
event: Event[EventStateChangedData],
) -> None:
"""Handle state changes on the observed device."""
if (new_state := event.data["new_state"]) is None:
return
try:
if self._attribute:
state = new_state.attributes.get(self._attribute)
else:
state = new_state.state
if state in (STATE_UNKNOWN, STATE_UNAVAILABLE):
self._attr_available = False
else:
self._attr_available = True
sample = (new_state.last_updated.timestamp(), float(state)) # type: ignore[arg-type]
self.samples.append(sample)
self.async_schedule_update_ha_state(True)
except (ValueError, TypeError) as ex:
_LOGGER.error(
"Error processing sensor state change for "
"entity_id=%s, attribute=%s, state=%s: %s",
self._entity_id,
self._attribute,
new_state.state,
ex,
)
self.async_on_remove(
async_track_state_change_event(
self.hass, [self._entity_id], trend_sensor_state_listener
)
)
if not (state := await self.async_get_last_state()):
return
if state.state in {STATE_UNKNOWN, STATE_UNAVAILABLE}:
return
self._attr_is_on = state.state == STATE_ON
async def async_update(self) -> None:
"""Get the latest data and update the states."""
# Remove outdated samples
if self._sample_duration > 0:
cutoff = utcnow().timestamp() - self._sample_duration
while self.samples and self.samples[0][0] < cutoff:
self.samples.popleft()
if len(self.samples) < self._min_samples:
return
# Calculate gradient of linear trend
await self.hass.async_add_executor_job(self._calculate_gradient)
# Update state
self._attr_is_on = (
abs(self._gradient) > abs(self._min_gradient)
and math.copysign(self._gradient, self._min_gradient) == self._gradient
)
if self._invert:
self._attr_is_on = not self._attr_is_on
def _calculate_gradient(self) -> None:
"""Compute the linear trend gradient of the current samples.
This need run inside executor.
"""
timestamps = np.array([t for t, _ in self.samples])
values = np.array([s for _, s in self.samples])
coeffs = np.polyfit(timestamps, values, 1)
self._gradient = coeffs[0]