1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-21 03:20:01 +00:00
Files
core/homeassistant/components/unifiprotect/event.py
2025-12-12 16:23:11 +01:00

431 lines
16 KiB
Python

"""Platform providing event entities for UniFi Protect."""
from __future__ import annotations
import dataclasses
from typing import Any
from uiprotect.data.nvr import Event, EventDetectedThumbnail
from homeassistant.components.event import (
EventDeviceClass,
EventEntity,
EventEntityDescription,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.event import async_call_at
from . import Bootstrap
from .const import (
ATTR_EVENT_ID,
EVENT_TYPE_DOORBELL_RING,
EVENT_TYPE_FINGERPRINT_IDENTIFIED,
EVENT_TYPE_FINGERPRINT_NOT_IDENTIFIED,
EVENT_TYPE_NFC_SCANNED,
EVENT_TYPE_VEHICLE_DETECTED,
KEYRINGS_KEY_TYPE_ID_NFC,
KEYRINGS_ULP_ID,
KEYRINGS_USER_FULL_NAME,
KEYRINGS_USER_STATUS,
VEHICLE_EVENT_DELAY_SECONDS,
)
from .data import (
Camera,
EventType,
ProtectAdoptableDeviceModel,
ProtectData,
ProtectDeviceType,
UFPConfigEntry,
)
from .entity import EventEntityMixin, ProtectDeviceEntity, ProtectEventMixin
PARALLEL_UPDATES = 0
# Select best thumbnail
# Prefer thumbnails with LPR data, sorted by confidence
# LPR can be in: 1) group.matched_name (UFP 6.0+) or 2) name field
def _thumbnail_sort_key(t: EventDetectedThumbnail) -> tuple[bool, float, float]:
"""Sort key: (has_lpr, confidence, clock_best_wall)."""
has_lpr = bool((t.group and t.group.matched_name) or (t.name and len(t.name) > 0))
confidence = t.confidence if t.confidence else 0.0
clock = t.clock_best_wall.timestamp() if t.clock_best_wall else 0.0
return (has_lpr, confidence, clock)
def _add_ulp_user_infos(
bootstrap: Bootstrap, event_data: dict[str, str], ulp_id: str
) -> None:
"""Add ULP user information to the event data."""
if ulp_usr := bootstrap.ulp_users.by_ulp_id(ulp_id):
event_data.update(
{
KEYRINGS_ULP_ID: ulp_usr.ulp_id,
KEYRINGS_USER_FULL_NAME: ulp_usr.full_name,
KEYRINGS_USER_STATUS: ulp_usr.status,
}
)
@dataclasses.dataclass(frozen=True, kw_only=True)
class ProtectEventEntityDescription(ProtectEventMixin, EventEntityDescription):
"""Describes UniFi Protect event entity."""
entity_class: type[ProtectDeviceEntity]
class ProtectDeviceRingEventEntity(EventEntityMixin, ProtectDeviceEntity, EventEntity):
"""A UniFi Protect event entity."""
entity_description: ProtectEventEntityDescription
@callback
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
description = self.entity_description
prev_event = self._event
prev_event_end = self._event_end
super()._async_update_device_from_protect(device)
if event := description.get_event_obj(device):
self._event = event
self._event_end = event.end if event else None
if (
event
and not self._event_already_ended(prev_event, prev_event_end)
and event.type is EventType.RING
):
self._trigger_event(EVENT_TYPE_DOORBELL_RING, {ATTR_EVENT_ID: event.id})
self.async_write_ha_state()
class ProtectDeviceNFCEventEntity(EventEntityMixin, ProtectDeviceEntity, EventEntity):
"""A UniFi Protect NFC event entity."""
entity_description: ProtectEventEntityDescription
@callback
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
description = self.entity_description
prev_event = self._event
prev_event_end = self._event_end
super()._async_update_device_from_protect(device)
if event := description.get_event_obj(device):
self._event = event
self._event_end = event.end if event else None
if (
event
and not self._event_already_ended(prev_event, prev_event_end)
and event.type is EventType.NFC_CARD_SCANNED
):
event_data = {
ATTR_EVENT_ID: event.id,
KEYRINGS_USER_FULL_NAME: "",
KEYRINGS_ULP_ID: "",
KEYRINGS_USER_STATUS: "",
KEYRINGS_KEY_TYPE_ID_NFC: "",
}
if event.metadata and event.metadata.nfc and event.metadata.nfc.nfc_id:
nfc_id = event.metadata.nfc.nfc_id
event_data[KEYRINGS_KEY_TYPE_ID_NFC] = nfc_id
keyring = self.data.api.bootstrap.keyrings.by_registry_id(nfc_id)
if keyring and keyring.ulp_user:
_add_ulp_user_infos(
self.data.api.bootstrap, event_data, keyring.ulp_user
)
self._trigger_event(EVENT_TYPE_NFC_SCANNED, event_data)
self.async_write_ha_state()
class ProtectDeviceFingerprintEventEntity(
EventEntityMixin, ProtectDeviceEntity, EventEntity
):
"""A UniFi Protect fingerprint event entity."""
entity_description: ProtectEventEntityDescription
@callback
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
description = self.entity_description
prev_event = self._event
prev_event_end = self._event_end
super()._async_update_device_from_protect(device)
if event := description.get_event_obj(device):
self._event = event
self._event_end = event.end if event else None
if (
event
and not self._event_already_ended(prev_event, prev_event_end)
and event.type is EventType.FINGERPRINT_IDENTIFIED
):
event_data = {
ATTR_EVENT_ID: event.id,
KEYRINGS_USER_FULL_NAME: "",
KEYRINGS_ULP_ID: "",
}
event_identified = EVENT_TYPE_FINGERPRINT_NOT_IDENTIFIED
if (
event.metadata
and event.metadata.fingerprint
and event.metadata.fingerprint.ulp_id
):
event_identified = EVENT_TYPE_FINGERPRINT_IDENTIFIED
ulp_id = event.metadata.fingerprint.ulp_id
if ulp_id:
event_data[KEYRINGS_ULP_ID] = ulp_id
_add_ulp_user_infos(self.data.api.bootstrap, event_data, ulp_id)
self._trigger_event(event_identified, event_data)
self.async_write_ha_state()
class ProtectDeviceVehicleEventEntity(
EventEntityMixin, ProtectDeviceEntity, EventEntity
):
"""A UniFi Protect vehicle detection event entity.
Vehicle detection events use a delayed firing mechanism to allow time for
the best thumbnail (with license plate recognition data) to arrive. The
timer is extended each time new thumbnails arrive for the same event. If
a new event arrives while a timer is pending, the old event fires immediately
with its stored thumbnails, then a new timer starts for the new event.
"""
entity_description: ProtectEventEntityDescription
_thumbnail_timer_cancel: CALLBACK_TYPE | None = None
_latest_event_id: str | None = None
_latest_thumbnails: list[EventDetectedThumbnail] | None = None
_thumbnail_timer_due: float = 0.0 # Loop time when timer should fire
_fired_event_id: str | None = None # Track last fired event to prevent duplicates
_fired_event_data: dict[str, Any] | None = None # Track event data when fired
async def async_added_to_hass(self) -> None:
"""Register cleanup callback when entity is added."""
await super().async_added_to_hass()
self.async_on_remove(self._cancel_thumbnail_timer)
@callback
def _cancel_thumbnail_timer(self) -> None:
"""Cancel pending thumbnail timer if one exists."""
if self._thumbnail_timer_cancel:
self._thumbnail_timer_cancel()
self._thumbnail_timer_cancel = None
@callback
def _async_timer_callback(self, *_: Any) -> None:
"""Handle timer expiration - fire the vehicle event.
If the due time was extended (new thumbnails arrived), re-arm the timer.
Otherwise, fire the event with the stored thumbnails.
"""
self._thumbnail_timer_cancel = None
if self._thumbnail_timer_due > self.hass.loop.time():
# Timer fired early because due time was extended; re-arm
self._async_set_thumbnail_timer()
return
if self._latest_event_id:
self._fire_vehicle_event(self._latest_event_id, self._latest_thumbnails)
@staticmethod
def _get_vehicle_thumbnails(event: Event) -> list[EventDetectedThumbnail]:
"""Get vehicle thumbnails from event."""
if event.metadata and event.metadata.detected_thumbnails:
return [
t for t in event.metadata.detected_thumbnails if t.type == "vehicle"
]
return []
@staticmethod
def _build_event_data(
event_id: str, thumbnails: list[EventDetectedThumbnail]
) -> dict[str, Any]:
"""Build event data dictionary from thumbnails."""
event_data: dict[str, Any] = {
ATTR_EVENT_ID: event_id,
"thumbnail_count": len(thumbnails),
}
thumbnail = max(thumbnails, key=_thumbnail_sort_key)
# Add confidence if available
if thumbnail.confidence is not None:
event_data["confidence"] = thumbnail.confidence
# Add best detection frame timestamp
if thumbnail.clock_best_wall is not None:
event_data["clock_best_wall"] = thumbnail.clock_best_wall.isoformat()
# License plate from group.matched_name (UFP 6.0+) or name field (older)
if thumbnail.group and thumbnail.group.matched_name:
event_data["license_plate"] = thumbnail.group.matched_name
elif thumbnail.name:
event_data["license_plate"] = thumbnail.name
# Add all thumbnail attributes as dict
if thumbnail.attributes:
event_data["attributes"] = thumbnail.attributes.unifi_dict()
return event_data
@callback
def _fire_vehicle_event(
self, event_id: str, thumbnails: list[EventDetectedThumbnail] | None = None
) -> None:
"""Fire the vehicle detection event with best available thumbnail.
Args:
event_id: The event ID to include in the fired event data.
thumbnails: Pre-stored thumbnails to use. If None, fetches from
the current event (used when event is still active).
"""
if thumbnails is None:
# No stored thumbnails; try to get from current event
event = self.entity_description.get_event_obj(self.device)
if not event or event.id != event_id:
return
thumbnails = self._get_vehicle_thumbnails(event)
if not thumbnails:
return
event_data = self._build_event_data(event_id, thumbnails)
# Prevent duplicate firing of same event with same data
if self._fired_event_id == event_id and self._fired_event_data == event_data:
return
# Mark this event as fired with its data
self._fired_event_id = event_id
self._fired_event_data = event_data
self._trigger_event(EVENT_TYPE_VEHICLE_DETECTED, event_data)
self.async_write_ha_state()
@callback
def _async_set_thumbnail_timer(self) -> None:
"""Schedule the thumbnail timer to fire at _thumbnail_timer_due."""
self._thumbnail_timer_cancel = async_call_at(
self.hass,
self._async_timer_callback,
self._thumbnail_timer_due,
)
@callback
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
description = self.entity_description
super()._async_update_device_from_protect(device)
if event := description.get_event_obj(device):
self._event = event
self._event_end = event.end if event else None
# Process vehicle detection events with thumbnails
if (
event
and event.type is EventType.SMART_DETECT
and (thumbnails := self._get_vehicle_thumbnails(event))
):
# Skip if same event with same data (no changes)
if (
self._fired_event_id == event.id
and self._fired_event_data
== self._build_event_data(event.id, thumbnails)
):
return
# New event arrived while timer pending for different event?
# Fire the old event immediately since it has completed
if self._latest_event_id and self._latest_event_id != event.id:
# Only fire if we haven't already (shouldn't happen, but defensive)
self._fire_vehicle_event(self._latest_event_id, self._latest_thumbnails)
self._cancel_thumbnail_timer()
# Store event data and extend/start the timer
# Timer extension allows better thumbnails (with LPR) to arrive
self._latest_event_id = event.id
self._latest_thumbnails = thumbnails
self._thumbnail_timer_due = (
self.hass.loop.time() + VEHICLE_EVENT_DELAY_SECONDS
)
# Only schedule if no timer running; existing timer will re-arm
if self._thumbnail_timer_cancel is None:
self._async_set_thumbnail_timer()
EVENT_DESCRIPTIONS: tuple[ProtectEventEntityDescription, ...] = (
ProtectEventEntityDescription(
key="doorbell",
translation_key="doorbell",
device_class=EventDeviceClass.DOORBELL,
ufp_required_field="feature_flags.is_doorbell",
ufp_event_obj="last_ring_event",
event_types=[EVENT_TYPE_DOORBELL_RING],
entity_class=ProtectDeviceRingEventEntity,
),
ProtectEventEntityDescription(
key="nfc",
translation_key="nfc",
ufp_required_field="feature_flags.support_nfc",
ufp_event_obj="last_nfc_card_scanned_event",
event_types=[EVENT_TYPE_NFC_SCANNED],
entity_class=ProtectDeviceNFCEventEntity,
),
ProtectEventEntityDescription(
key="fingerprint",
translation_key="fingerprint",
ufp_required_field="feature_flags.has_fingerprint_sensor",
ufp_event_obj="last_fingerprint_identified_event",
event_types=[
EVENT_TYPE_FINGERPRINT_IDENTIFIED,
EVENT_TYPE_FINGERPRINT_NOT_IDENTIFIED,
],
entity_class=ProtectDeviceFingerprintEventEntity,
),
ProtectEventEntityDescription(
key="vehicle",
translation_key="vehicle",
ufp_required_field="feature_flags.has_smart_detect",
ufp_event_obj="last_smart_detect_event",
event_types=[EVENT_TYPE_VEHICLE_DETECTED],
entity_class=ProtectDeviceVehicleEventEntity,
),
)
@callback
def _async_event_entities(
data: ProtectData,
ufp_device: ProtectAdoptableDeviceModel | None = None,
) -> list[ProtectDeviceEntity]:
return [
description.entity_class(data, device, description)
for device in (data.get_cameras() if ufp_device is None else [ufp_device])
for description in EVENT_DESCRIPTIONS
if description.has_required(device)
]
async def async_setup_entry(
hass: HomeAssistant,
entry: UFPConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up event entities for UniFi Protect integration."""
data = entry.runtime_data
@callback
def _add_new_device(device: ProtectAdoptableDeviceModel) -> None:
if device.is_adopted and isinstance(device, Camera):
async_add_entities(_async_event_entities(data, ufp_device=device))
data.async_subscribe_adopt(_add_new_device)
async_add_entities(_async_event_entities(data))