mirror of
https://github.com/home-assistant/core.git
synced 2026-02-21 10:27:52 +00:00
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Paul Tarjan <ptarjan@users.noreply.github.com> Co-authored-by: Joostlek <joostlek@outlook.com>
161 lines
5.3 KiB
Python
161 lines
5.3 KiB
Python
"""The Hikvision integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
from xml.etree.ElementTree import ParseError
|
|
|
|
from pyhik.constants import SENSOR_MAP
|
|
from pyhik.hikvision import HikCamera, VideoChannel, get_video_channels
|
|
import requests
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
CONF_HOST,
|
|
CONF_PASSWORD,
|
|
CONF_PORT,
|
|
CONF_SSL,
|
|
CONF_USERNAME,
|
|
Platform,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers import device_registry as dr
|
|
|
|
from .const import DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
PLATFORMS = [Platform.BINARY_SENSOR, Platform.CAMERA]
|
|
|
|
|
|
@dataclass
|
|
class HikvisionData:
|
|
"""Data class for Hikvision runtime data."""
|
|
|
|
camera: HikCamera
|
|
device_id: str
|
|
device_name: str
|
|
device_type: str
|
|
host: str
|
|
username: str
|
|
password: str
|
|
channels: dict[int, VideoChannel] = field(default_factory=dict)
|
|
|
|
|
|
type HikvisionConfigEntry = ConfigEntry[HikvisionData]
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: HikvisionConfigEntry) -> bool:
|
|
"""Set up Hikvision from a config entry."""
|
|
host = entry.data[CONF_HOST]
|
|
port = entry.data[CONF_PORT]
|
|
username = entry.data[CONF_USERNAME]
|
|
password = entry.data[CONF_PASSWORD]
|
|
ssl = entry.data[CONF_SSL]
|
|
|
|
protocol = "https" if ssl else "http"
|
|
url = f"{protocol}://{host}"
|
|
|
|
try:
|
|
camera = await hass.async_add_executor_job(
|
|
HikCamera, url, port, username, password, ssl
|
|
)
|
|
except requests.exceptions.RequestException as err:
|
|
raise ConfigEntryNotReady(f"Unable to connect to {host}") from err
|
|
|
|
device_id = camera.get_id
|
|
if device_id is None:
|
|
raise ConfigEntryNotReady(f"Unable to get device ID from {host}")
|
|
|
|
device_name = camera.get_name or host
|
|
device_type = camera.get_type or "Camera"
|
|
|
|
# For NVRs, fetch video channel information
|
|
channels: dict[int, VideoChannel] = {}
|
|
if device_type == "NVR":
|
|
channel_list = await hass.async_add_executor_job(
|
|
get_video_channels, host, port, username, password, ssl
|
|
)
|
|
channels = {ch.id: ch for ch in channel_list}
|
|
_LOGGER.debug("Found %d video channels", len(channels))
|
|
|
|
entry.runtime_data = HikvisionData(
|
|
camera=camera,
|
|
device_id=device_id,
|
|
device_name=device_name,
|
|
device_type=device_type,
|
|
host=host,
|
|
username=username,
|
|
password=password,
|
|
channels=channels,
|
|
)
|
|
|
|
_LOGGER.debug(
|
|
"Device %s (type=%s) initial event_states: %s",
|
|
device_name,
|
|
device_type,
|
|
camera.current_event_states,
|
|
)
|
|
|
|
# For NVRs or devices with no detected events, try to fetch events from ISAPI
|
|
# Use broader notification methods for NVRs since they often use 'record' etc.
|
|
if device_type == "NVR" or not camera.current_event_states:
|
|
nvr_notification_methods = {"center", "HTTP", "record", "email", "beep"}
|
|
|
|
def fetch_and_inject_nvr_events() -> None:
|
|
"""Fetch and inject NVR events in a single executor job."""
|
|
try:
|
|
nvr_events = camera.get_event_triggers(nvr_notification_methods)
|
|
except (requests.exceptions.RequestException, ParseError) as err:
|
|
_LOGGER.warning("Unable to fetch event triggers from %s: %s", host, err)
|
|
return
|
|
|
|
_LOGGER.debug("NVR events fetched with extended methods: %s", nvr_events)
|
|
if nvr_events:
|
|
# Map raw event type names to friendly names using SENSOR_MAP
|
|
mapped_events: dict[str, list[int]] = {}
|
|
for event_type, channels in nvr_events.items():
|
|
friendly_name = SENSOR_MAP.get(event_type.lower(), event_type)
|
|
if friendly_name in mapped_events:
|
|
mapped_events[friendly_name].extend(channels)
|
|
else:
|
|
mapped_events[friendly_name] = list(channels)
|
|
_LOGGER.debug("Mapped NVR events: %s", mapped_events)
|
|
camera.inject_events(mapped_events)
|
|
else:
|
|
_LOGGER.debug(
|
|
"No event triggers returned from %s. "
|
|
"Ensure events are configured on the device",
|
|
host,
|
|
)
|
|
|
|
await hass.async_add_executor_job(fetch_and_inject_nvr_events)
|
|
|
|
# Start the event stream
|
|
await hass.async_add_executor_job(camera.start_stream)
|
|
|
|
# Register the main device before platforms that use via_device
|
|
device_registry = dr.async_get(hass)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=entry.entry_id,
|
|
identifiers={(DOMAIN, device_id)},
|
|
name=device_name,
|
|
manufacturer="Hikvision",
|
|
model=device_type,
|
|
)
|
|
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
|
|
return True
|
|
|
|
|
|
async def async_unload_entry(hass: HomeAssistant, entry: HikvisionConfigEntry) -> bool:
|
|
"""Unload a config entry."""
|
|
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
|
# Stop the event stream
|
|
await hass.async_add_executor_job(entry.runtime_data.camera.disconnect)
|
|
|
|
return unload_ok
|