"""Provide a registry to track entity IDs. The Entity Registry keeps a registry of entities. Entities are uniquely identified by their domain, platform and a unique id provided by that platform. The Entity Registry will persist itself 10 seconds after a new entity is registered. Registering a new entity while a timer is in progress resets the timer. """ from __future__ import annotations from collections import defaultdict from collections.abc import Callable, Hashable, KeysView, Mapping from datetime import datetime, timedelta from enum import Enum, StrEnum import logging import time from typing import TYPE_CHECKING, Any, Literal, NotRequired, TypedDict import attr import voluptuous as vol from homeassistant.const import ( ATTR_DEVICE_CLASS, ATTR_FRIENDLY_NAME, ATTR_ICON, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES, ATTR_UNIT_OF_MEASUREMENT, EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, MAX_LENGTH_STATE_DOMAIN, MAX_LENGTH_STATE_ENTITY_ID, STATE_UNAVAILABLE, STATE_UNKNOWN, EntityCategory, Platform, ) from homeassistant.core import ( Event, HomeAssistant, callback, split_entity_id, valid_entity_id, ) from homeassistant.exceptions import MaxLengthExceeded from homeassistant.loader import async_suggest_report_issue from homeassistant.util import slugify, uuid as uuid_util from homeassistant.util.dt import utc_from_timestamp, utcnow from homeassistant.util.event_type import EventType from homeassistant.util.hass_dict import HassKey from homeassistant.util.json import format_unserializable_data from homeassistant.util.read_only_dict import ReadOnlyDict from . import device_registry as dr, storage from .device_registry import ( EVENT_DEVICE_REGISTRY_UPDATED, EventDeviceRegistryUpdatedData, ) from .frame import ReportBehavior, report_usage from .json import JSON_DUMP, find_paths_unserializable_data, json_bytes, json_fragment from .registry import BaseRegistry, BaseRegistryItems, RegistryIndexType from .singleton import singleton from .typing import UNDEFINED, UndefinedType if TYPE_CHECKING: # mypy cannot workout _cache Protocol with attrs from propcache.api import cached_property as under_cached_property from homeassistant.config_entries import ConfigEntry else: from propcache.api import under_cached_property DATA_REGISTRY: HassKey[EntityRegistry] = HassKey("entity_registry") EVENT_ENTITY_REGISTRY_UPDATED: EventType[EventEntityRegistryUpdatedData] = EventType( "entity_registry_updated" ) _LOGGER = logging.getLogger(__name__) STORAGE_VERSION_MAJOR = 1 STORAGE_VERSION_MINOR = 22 STORAGE_KEY = "core.entity_registry" CLEANUP_INTERVAL = 3600 * 24 ORPHANED_ENTITY_KEEP_SECONDS = 3600 * 24 * 30 ENTITY_CATEGORY_VALUE_TO_INDEX: dict[EntityCategory | None, int] = { val: idx for idx, val in enumerate(EntityCategory) } ENTITY_CATEGORY_INDEX_TO_VALUE = dict(enumerate(EntityCategory)) class ComputedNameType(Enum): """Singleton representing the computed full entity name in aliases.""" _singleton = 0 COMPUTED_NAME = ComputedNameType._singleton # noqa: SLF001 type AliasEntry = str | ComputedNameType def _serialize_aliases(aliases: list[AliasEntry]) -> list[str | None]: """Convert aliases to a JSON-serializable list.""" return [None if a is COMPUTED_NAME else a for a in aliases] def _deserialize_aliases(aliases: list[str | None]) -> list[AliasEntry]: """Convert aliases from JSON to internal representation.""" return [COMPUTED_NAME if a is None else a for a in aliases] # Attributes relevant to describing entity # to external services. ENTITY_DESCRIBING_ATTRIBUTES = { "capabilities", "device_class", "entity_id", "name", "original_name", "supported_features", "unit_of_measurement", } class RegistryEntryDisabler(StrEnum): """What disabled a registry entry.""" CONFIG_ENTRY = "config_entry" DEVICE = "device" HASS = "hass" INTEGRATION = "integration" USER = "user" class RegistryEntryHider(StrEnum): """What hid a registry entry.""" INTEGRATION = "integration" USER = "user" class _EventEntityRegistryUpdatedData_CreateRemove(TypedDict): """EventEntityRegistryUpdated data for action type 'create' and 'remove'.""" action: Literal["create", "remove"] entity_id: str class _EventEntityRegistryUpdatedData_Update(TypedDict): """EventEntityRegistryUpdated data for action type 'update'.""" action: Literal["update"] entity_id: str changes: dict[str, Any] # Required with action == "update" old_entity_id: NotRequired[str] type EventEntityRegistryUpdatedData = ( _EventEntityRegistryUpdatedData_CreateRemove | _EventEntityRegistryUpdatedData_Update ) type EntityOptionsType = Mapping[str, Mapping[str, Any]] type ReadOnlyEntityOptionsType = ReadOnlyDict[str, ReadOnlyDict[str, Any]] DISPLAY_DICT_OPTIONAL = ( # key, attr_name, convert_to_list ("ai", "area_id", False), ("lb", "labels", True), ("di", "device_id", False), ("ic", "icon", False), ("tk", "translation_key", False), ) def _protect_entity_options( data: EntityOptionsType | None, ) -> ReadOnlyEntityOptionsType: """Protect entity options from being modified.""" if data is None: return ReadOnlyDict({}) return ReadOnlyDict({key: ReadOnlyDict(val) for key, val in data.items()}) def _protect_optional_entity_options( data: EntityOptionsType | UndefinedType | None, ) -> ReadOnlyEntityOptionsType | UndefinedType: """Protect entity options from being modified.""" if data is UNDEFINED: return UNDEFINED if data is None: return ReadOnlyDict({}) return ReadOnlyDict({key: ReadOnlyDict(val) for key, val in data.items()}) @attr.s(frozen=True, kw_only=True, slots=True) class RegistryEntry: """Entity Registry Entry.""" entity_id: str = attr.ib() unique_id: str = attr.ib() platform: str = attr.ib() previous_unique_id: str | None = attr.ib(default=None) aliases: list[AliasEntry] = attr.ib(factory=list) area_id: str | None = attr.ib(default=None) categories: dict[str, str] = attr.ib(factory=dict) capabilities: Mapping[str, Any] | None = attr.ib() config_entry_id: str | None = attr.ib() config_subentry_id: str | None = attr.ib() created_at: datetime = attr.ib() device_class: str | None = attr.ib(default=None) device_id: str | None = attr.ib() domain: str = attr.ib(init=False, repr=False) disabled_by: RegistryEntryDisabler | None = attr.ib() entity_category: EntityCategory | None = attr.ib() has_entity_name: bool = attr.ib() hidden_by: RegistryEntryHider | None = attr.ib() icon: str | None = attr.ib(default=None) id: str = attr.ib( converter=attr.converters.default_if_none(factory=uuid_util.random_uuid_hex) # type: ignore[misc] ) labels: set[str] = attr.ib(factory=set) modified_at: datetime = attr.ib(factory=utcnow) name: str | None = attr.ib(default=None) object_id_base: str | None = attr.ib() options: ReadOnlyEntityOptionsType = attr.ib(converter=_protect_entity_options) # As set by integration original_device_class: str | None = attr.ib() original_icon: str | None = attr.ib() original_name: str | None = attr.ib() suggested_object_id: str | None = attr.ib() supported_features: int = attr.ib() translation_key: str | None = attr.ib() unit_of_measurement: str | None = attr.ib() # For backwards compatibility, should be removed in the future compat_aliases: list[str] = attr.ib(factory=list, eq=False) # original_name_unprefixed is used to store the result of stripping # the device name prefix from the original_name, if possible. # Used only for integrations that do not set has_entity_name to True. original_name_unprefixed: str | None = attr.ib(default=None, eq=False) _cache: dict[str, Any] = attr.ib(factory=dict, eq=False, init=False) @domain.default def _domain_default(self) -> str: """Compute domain value.""" return split_entity_id(self.entity_id)[0] @property def disabled(self) -> bool: """Return if entry is disabled.""" return self.disabled_by is not None @property def hidden(self) -> bool: """Return if entry is hidden.""" return self.hidden_by is not None @property def _as_display_dict(self) -> dict[str, Any] | None: """Return a partial dict representation of the entry. This version only includes what's needed for display. Returns None if there's no data needed for display. """ display_dict: dict[str, Any] = {"ei": self.entity_id, "pl": self.platform} for key, attr_name, convert_list in DISPLAY_DICT_OPTIONAL: if (attr_val := getattr(self, attr_name)) is not None: # Convert sets and tuples to lists # so the JSON serializer does not have to do # it every time display_dict[key] = list(attr_val) if convert_list else attr_val if (category := self.entity_category) is not None: display_dict["ec"] = ENTITY_CATEGORY_VALUE_TO_INDEX[category] if self.hidden_by is not None: display_dict["hb"] = True if self.has_entity_name: display_dict["hn"] = True name = ( self.name if self.name is not None else ( self.original_name_unprefixed if self.original_name_unprefixed is not None else self.original_name ) ) if name is not None: display_dict["en"] = name if self.domain == "sensor" and (sensor_options := self.options.get("sensor")): if (precision := sensor_options.get("display_precision")) is not None or ( precision := sensor_options.get("suggested_display_precision") ) is not None: display_dict["dp"] = precision return display_dict @under_cached_property def display_json_repr(self) -> bytes | None: """Return a cached partial JSON representation of the entry. This version only includes what's needed for display. """ try: dict_repr = self._as_display_dict json_repr: bytes | None = json_bytes(dict_repr) if dict_repr else None except ValueError, TypeError: _LOGGER.error( "Unable to serialize entry %s to JSON. Bad data found at %s", self.entity_id, format_unserializable_data( find_paths_unserializable_data(dict_repr, dump=JSON_DUMP) ), ) return None return json_repr @under_cached_property def as_partial_dict(self) -> dict[str, Any]: """Return a partial dict representation of the entry.""" # Convert sets and tuples to lists # so the JSON serializer does not have to do # it every time original_name = ( self.original_name_unprefixed if self.original_name_unprefixed is not None else self.original_name ) return { "area_id": self.area_id, "categories": self.categories, "config_entry_id": self.config_entry_id, "config_subentry_id": self.config_subentry_id, "created_at": self.created_at.timestamp(), "device_id": self.device_id, "disabled_by": self.disabled_by, "entity_category": self.entity_category, "entity_id": self.entity_id, "has_entity_name": self.has_entity_name, "hidden_by": self.hidden_by, "icon": self.icon, "id": self.id, "labels": list(self.labels), "modified_at": self.modified_at.timestamp(), "name": self.name, "options": self.options, "original_name": original_name, "platform": self.platform, "translation_key": self.translation_key, "unique_id": self.unique_id, } @under_cached_property def extended_dict(self) -> dict[str, Any]: """Return a extended dict representation of the entry.""" # Convert sets and tuples to lists # so the JSON serializer does not have to do # it every time return { **self.as_partial_dict, "aliases": _serialize_aliases(self.aliases), "capabilities": self.capabilities, "device_class": self.device_class, "original_device_class": self.original_device_class, "original_icon": self.original_icon, } @under_cached_property def partial_json_repr(self) -> bytes | None: """Return a cached partial JSON representation of the entry.""" try: dict_repr = self.as_partial_dict return json_bytes(dict_repr) except ValueError, TypeError: _LOGGER.error( "Unable to serialize entry %s to JSON. Bad data found at %s", self.entity_id, format_unserializable_data( find_paths_unserializable_data(dict_repr, dump=JSON_DUMP) ), ) return None @under_cached_property def as_storage_fragment(self) -> json_fragment: """Return a json fragment for storage.""" return json_fragment( json_bytes( { "aliases": self.compat_aliases, "aliases_v2": _serialize_aliases(self.aliases), "area_id": self.area_id, "categories": self.categories, "capabilities": self.capabilities, "config_entry_id": self.config_entry_id, "config_subentry_id": self.config_subentry_id, "created_at": self.created_at, "device_class": self.device_class, "device_id": self.device_id, "disabled_by": self.disabled_by, "entity_category": self.entity_category, "entity_id": self.entity_id, "hidden_by": self.hidden_by, "icon": self.icon, "id": self.id, "has_entity_name": self.has_entity_name, "labels": list(self.labels), "modified_at": self.modified_at, "name": self.name, "object_id_base": self.object_id_base, "options": self.options, "original_device_class": self.original_device_class, "original_icon": self.original_icon, "original_name": self.original_name, "platform": self.platform, "suggested_object_id": self.suggested_object_id, "supported_features": self.supported_features, "translation_key": self.translation_key, "unique_id": self.unique_id, "previous_unique_id": self.previous_unique_id, "unit_of_measurement": self.unit_of_measurement, } ) ) @callback def write_unavailable_state(self, hass: HomeAssistant) -> None: """Write the unavailable state to the state machine.""" attrs: dict[str, Any] = {ATTR_RESTORED: True} if self.capabilities is not None: attrs.update(self.capabilities) device_class = self.device_class or self.original_device_class if device_class is not None: attrs[ATTR_DEVICE_CLASS] = device_class icon = self.icon or self.original_icon if icon is not None: attrs[ATTR_ICON] = icon name = self.name or self.original_name if name is not None: attrs[ATTR_FRIENDLY_NAME] = name if self.supported_features is not None: attrs[ATTR_SUPPORTED_FEATURES] = self.supported_features if self.unit_of_measurement is not None: attrs[ATTR_UNIT_OF_MEASUREMENT] = self.unit_of_measurement hass.states.async_set(self.entity_id, STATE_UNAVAILABLE, attrs) @callback def async_get_unprefixed_name(hass: HomeAssistant, entry: RegistryEntry) -> str: """Get the entity name with device name prefix stripped, if applicable.""" name = entry.name if name is not None: if ( entry.device_id is not None and (device := dr.async_get(hass).async_get(entry.device_id)) is not None ): device_name = device.name_by_user or device.name unprefixed_name = _async_strip_prefix_from_entity_name(name, device_name) if unprefixed_name is not None: return unprefixed_name return name if entry.original_name_unprefixed is not None: return entry.original_name_unprefixed return entry.original_name or "" @callback def _async_get_full_entity_name( hass: HomeAssistant, *, device_id: str | None, fallback: str, has_entity_name: bool, name: str | None, original_name: str | None, original_name_unprefixed: str | None | UndefinedType = UNDEFINED, overridden_name: str | None = None, unprefix_name: bool = False, use_legacy_naming: bool = False, ) -> str: """Get full name for an entity. This includes the device name if appropriate. Used for both full entity name and entity ID. """ if name is None and overridden_name is not None: name = overridden_name elif not use_legacy_naming or name is None: device_name: str | None = None if ( device_id is not None and (device := dr.async_get(hass).async_get(device_id)) is not None ): device_name = device.name_by_user or device.name if name is None: if original_name_unprefixed is UNDEFINED: original_name_unprefixed = ( _async_strip_prefix_from_entity_name(original_name, device_name) if not has_entity_name else None ) name = ( original_name_unprefixed if original_name_unprefixed is not None else original_name ) elif unprefix_name: unprefixed_name = _async_strip_prefix_from_entity_name(name, device_name) if unprefixed_name is not None: name = unprefixed_name if not name: name = device_name elif device_name: name = f"{device_name} {name}" if not name: return fallback return name @callback def async_get_full_entity_name( hass: HomeAssistant, entry: RegistryEntry, original_name: str | None | UndefinedType = UNDEFINED, ) -> str: """Get full entity name for an entry.""" original_name_unprefixed: str | None | UndefinedType = UNDEFINED if original_name is UNDEFINED or original_name == entry.original_name: original_name = entry.original_name original_name_unprefixed = entry.original_name_unprefixed return _async_get_full_entity_name( hass, device_id=entry.device_id, fallback="", has_entity_name=entry.has_entity_name, name=entry.name, original_name=original_name, original_name_unprefixed=original_name_unprefixed, use_legacy_naming=True, ) @callback def async_get_entity_aliases( hass: HomeAssistant, entry: RegistryEntry, *, allow_empty: bool = True, ) -> list[str]: """Get all names/aliases for an entity. Processes entry aliases where COMPUTED_NAME entries are replaced with the computed full entity name. String entries are used as-is. The returned list preserves the order set by the user. """ entry_aliases = entry.aliases if not entry_aliases: if allow_empty: return [] entry_aliases = [COMPUTED_NAME] aliases = [] for alias in entry_aliases: if alias is COMPUTED_NAME: alias = async_get_full_entity_name(hass, entry) aliases.append(alias.strip()) return aliases @callback def _async_strip_prefix_from_entity_name( entity_name: str | None, prefix: str | None ) -> str | None: """Strip prefix from entity name. Returns None if the prefix does not meaningfully match. """ if not entity_name or not prefix: return None prefix_lower = prefix.casefold() prefix_len = len(prefix_lower) candidate = entity_name[:prefix_len] true_prefix_len = len(candidate) candidate = candidate.casefold() if not candidate.startswith(prefix_lower): return None # Casefolded string can differ in length prefix_diff = len(candidate) - prefix_len while prefix_diff > 0: true_prefix_len -= 1 prefix_diff -= len(entity_name[true_prefix_len].casefold()) # Casefolded string matched in a middle of a character, not a valid prefix if prefix_diff < 0: return None new_name = entity_name[true_prefix_len:].lstrip(" -:") if not new_name: return "" # Must have at least one separator character if len(new_name) == len(entity_name) - true_prefix_len: return None first_word = new_name.partition(" ")[0] # Preserve a mixed-case word, capitalize lowercase if not first_word.islower(): return new_name return new_name[0].upper() + new_name[1:] @callback def _unprefix_original_name( hass: HomeAssistant, original_name: str | None, has_entity_name: bool, device_id: str | None, ) -> str | None: """Calculate original_name_unprefixed.""" if has_entity_name or device_id is None: return None if (device := dr.async_get(hass).async_get(device_id)) is None: return None device_name = device.name_by_user or device.name return _async_strip_prefix_from_entity_name(original_name, device_name) @attr.s(frozen=True, slots=True) class DeletedRegistryEntry: """Deleted Entity Registry Entry.""" entity_id: str = attr.ib() unique_id: str = attr.ib() platform: str = attr.ib() aliases: list[AliasEntry] = attr.ib() area_id: str | None = attr.ib() categories: dict[str, str] = attr.ib() config_entry_id: str | None = attr.ib() config_subentry_id: str | None = attr.ib() created_at: datetime = attr.ib() device_class: str | None = attr.ib() disabled_by: RegistryEntryDisabler | UndefinedType | None = attr.ib() domain: str = attr.ib(init=False, repr=False) hidden_by: RegistryEntryHider | UndefinedType | None = attr.ib() icon: str | None = attr.ib() id: str = attr.ib() labels: set[str] = attr.ib() modified_at: datetime = attr.ib() name: str | None = attr.ib() options: ReadOnlyEntityOptionsType | UndefinedType = attr.ib( converter=_protect_optional_entity_options ) orphaned_timestamp: float | None = attr.ib() # For backwards compatibility, should be removed in the future compat_aliases: list[str] = attr.ib(factory=list, eq=False) _cache: dict[str, Any] = attr.ib(factory=dict, eq=False, init=False) @domain.default def _domain_default(self) -> str: """Compute domain value.""" return split_entity_id(self.entity_id)[0] @under_cached_property def as_storage_fragment(self) -> json_fragment: """Return a json fragment for storage.""" return json_fragment( json_bytes( { "aliases": self.compat_aliases, "aliases_v2": _serialize_aliases(self.aliases), "area_id": self.area_id, "categories": self.categories, "config_entry_id": self.config_entry_id, "config_subentry_id": self.config_subentry_id, "created_at": self.created_at, "device_class": self.device_class, "disabled_by": self.disabled_by if self.disabled_by is not UNDEFINED else None, "disabled_by_undefined": self.disabled_by is UNDEFINED, "entity_id": self.entity_id, "hidden_by": self.hidden_by if self.hidden_by is not UNDEFINED else None, "hidden_by_undefined": self.hidden_by is UNDEFINED, "icon": self.icon, "id": self.id, "labels": list(self.labels), "modified_at": self.modified_at, "name": self.name, "options": self.options if self.options is not UNDEFINED else {}, "options_undefined": self.options is UNDEFINED, "orphaned_timestamp": self.orphaned_timestamp, "platform": self.platform, "unique_id": self.unique_id, } ) ) class EntityRegistryStore(storage.Store[dict[str, list[dict[str, Any]]]]): """Store entity registry data.""" async def _async_migrate_func( # noqa: C901 self, old_major_version: int, old_minor_version: int, old_data: dict[str, list[dict[str, Any]]], ) -> dict: """Migrate to the new version.""" data = old_data if old_major_version == 1: if old_minor_version < 2: # Version 1.2 implements migration and freezes the available keys for entity in data["entities"]: # Populate keys which were introduced before version 1.2 entity.setdefault("area_id", None) entity.setdefault("capabilities", {}) entity.setdefault("config_entry_id", None) entity.setdefault("device_class", None) entity.setdefault("device_id", None) entity.setdefault("disabled_by", None) entity.setdefault("entity_category", None) entity.setdefault("icon", None) entity.setdefault("name", None) entity.setdefault("original_icon", None) entity.setdefault("original_name", None) entity.setdefault("supported_features", 0) entity.setdefault("unit_of_measurement", None) if old_minor_version < 3: # Version 1.3 adds original_device_class for entity in data["entities"]: # Move device_class to original_device_class entity["original_device_class"] = entity["device_class"] entity["device_class"] = None if old_minor_version < 4: # Version 1.4 adds id for entity in data["entities"]: entity["id"] = uuid_util.random_uuid_hex() if old_minor_version < 5: # Version 1.5 adds entity options for entity in data["entities"]: entity["options"] = {} if old_minor_version < 6: # Version 1.6 adds hidden_by for entity in data["entities"]: entity["hidden_by"] = None if old_minor_version < 7: # Version 1.7 adds has_entity_name for entity in data["entities"]: entity["has_entity_name"] = False if old_minor_version < 8: # Cleanup after frontend bug which incorrectly updated device_class # Fixed by frontend PR #13551 for entity in data["entities"]: domain = split_entity_id(entity["entity_id"])[0] if domain in [Platform.BINARY_SENSOR, Platform.COVER]: continue entity["device_class"] = None if old_minor_version < 9: # Version 1.9 adds translation_key for entity in data["entities"]: entity["translation_key"] = None if old_minor_version < 10: # Version 1.10 adds aliases for entity in data["entities"]: entity["aliases"] = [] if old_minor_version < 11: # Version 1.11 adds deleted_entities data["deleted_entities"] = data.get("deleted_entities", []) if old_minor_version < 12: # Version 1.12 adds previous_unique_id for entity in data["entities"]: entity["previous_unique_id"] = None if old_minor_version < 13: # Version 1.13 adds labels for entity in data["entities"]: entity["labels"] = [] if old_minor_version < 14: # Version 1.14 adds categories for entity in data["entities"]: entity["categories"] = {} if old_minor_version < 15: # Version 1.15 adds created_at and modified_at created_at = utc_from_timestamp(0).isoformat() for entity in data["entities"]: entity["created_at"] = entity["modified_at"] = created_at for entity in data["deleted_entities"]: entity["created_at"] = entity["modified_at"] = created_at if old_minor_version < 16: # Version 1.16 adds config_subentry_id for entity in data["entities"]: entity["config_subentry_id"] = None for entity in data["deleted_entities"]: entity["config_subentry_id"] = None if old_minor_version < 17: # Version 1.17 adds suggested_object_id for entity in data["entities"]: entity["suggested_object_id"] = None if old_minor_version < 18: # Version 1.18 adds user customizations to deleted entities for entity in data["deleted_entities"]: entity["aliases"] = [] entity["area_id"] = None entity["categories"] = {} entity["device_class"] = None entity["disabled_by"] = None entity["hidden_by"] = None entity["icon"] = None entity["labels"] = [] entity["name"] = None entity["options"] = {} if old_minor_version < 19: # Version 1.19 adds undefined flags to deleted entities, this is a bugfix # of version 1.18 set_to_undefined = old_minor_version < 18 for entity in data["deleted_entities"]: entity["disabled_by_undefined"] = set_to_undefined entity["hidden_by_undefined"] = set_to_undefined entity["options_undefined"] = set_to_undefined if old_minor_version < 20: # Version 1.20 adds object_id_base to entities for entity in data["entities"]: entity["object_id_base"] = entity["original_name"] if old_minor_version == 21: # Version 1.21 has been reverted. # It migrated entity names to the new format stored in `name_v2` # field, automatically stripping any device name prefix present. # The old name was stored in `name` field for backwards compatibility. # For users who already migrated to v1.21, we restore old names # but try to preserve any user renames made since that migration. device_registry = dr.async_get(self.hass) for entity in data["entities"]: old_name = entity["name"] name = entity.pop("name_v2") if ( (name != old_name) and (device_id := entity["device_id"]) is not None and (device := device_registry.async_get(device_id)) is not None and (device_name := device.name_by_user or device.name) ): name = f"{device_name} {name}" entity["name"] = name if old_minor_version < 22: # Version 1.22 adds support for COMPUTED_NAME in aliases and starts preserving # their order. # To avoid a major version bump, we keep the old aliases as-is and use aliases_v2 # field instead. for entity in data["entities"]: entity["aliases_v2"] = [None, *entity["aliases"]] for entity in data["deleted_entities"]: entity["aliases_v2"] = [None, *entity["aliases"]] if old_major_version > 1: raise NotImplementedError return data class EntityRegistryItems(BaseRegistryItems[RegistryEntry]): """Container for entity registry items, maps entity_id -> entry. Maintains six additional indexes: - id -> entry - (domain, platform, unique_id) -> entity_id - config_entry_id -> dict[key, True] - device_id -> dict[key, True] - area_id -> dict[key, True] - label -> dict[key, True] """ def __init__(self) -> None: """Initialize the container.""" super().__init__() self._entry_ids: dict[str, RegistryEntry] = {} self._index: dict[tuple[str, str, str], str] = {} self._config_entry_id_index: RegistryIndexType = defaultdict(dict) self._device_id_index: RegistryIndexType = defaultdict(dict) self._area_id_index: RegistryIndexType = defaultdict(dict) self._labels_index: RegistryIndexType = defaultdict(dict) def _index_entry(self, key: str, entry: RegistryEntry) -> None: """Index an entry.""" self._entry_ids[entry.id] = entry self._index[(entry.domain, entry.platform, entry.unique_id)] = entry.entity_id # python has no ordered set, so we use a dict with True values # https://discuss.python.org/t/add-orderedset-to-stdlib/12730 if (config_entry_id := entry.config_entry_id) is not None: self._config_entry_id_index[config_entry_id][key] = True if (device_id := entry.device_id) is not None: self._device_id_index[device_id][key] = True if (area_id := entry.area_id) is not None: self._area_id_index[area_id][key] = True for label in entry.labels: self._labels_index[label][key] = True def _unindex_entry( self, key: str, replacement_entry: RegistryEntry | None = None ) -> None: """Unindex an entry.""" entry = self.data[key] del self._entry_ids[entry.id] del self._index[(entry.domain, entry.platform, entry.unique_id)] if config_entry_id := entry.config_entry_id: self._unindex_entry_value(key, config_entry_id, self._config_entry_id_index) if device_id := entry.device_id: self._unindex_entry_value(key, device_id, self._device_id_index) if area_id := entry.area_id: self._unindex_entry_value(key, area_id, self._area_id_index) if labels := entry.labels: for label in labels: self._unindex_entry_value(key, label, self._labels_index) def get_device_ids(self) -> KeysView[str]: """Return device ids.""" return self._device_id_index.keys() def get_entity_id(self, key: tuple[str, str, str]) -> str | None: """Get entity_id from (domain, platform, unique_id).""" return self._index.get(key) def get_entry(self, key: str) -> RegistryEntry | None: """Get entry from id.""" return self._entry_ids.get(key) def get_entries_for_device_id( self, device_id: str, include_disabled_entities: bool = False ) -> list[RegistryEntry]: """Get entries for device.""" data = self.data return [ entry for key in self._device_id_index.get(device_id, ()) if not (entry := data[key]).disabled_by or include_disabled_entities ] def get_entries_for_config_entry_id( self, config_entry_id: str ) -> list[RegistryEntry]: """Get entries for config entry.""" data = self.data return [ data[key] for key in self._config_entry_id_index.get(config_entry_id, ()) ] def get_entries_for_area_id(self, area_id: str) -> list[RegistryEntry]: """Get entries for area.""" data = self.data return [data[key] for key in self._area_id_index.get(area_id, ())] def get_entries_for_label(self, label: str) -> list[RegistryEntry]: """Get entries for label.""" data = self.data return [data[key] for key in self._labels_index.get(label, ())] def _validate_item( hass: HomeAssistant, domain: str, platform: str, *, config_entry_id: str | None | UndefinedType = None, config_subentry_id: str | None | UndefinedType = None, device_id: str | None | UndefinedType = None, disabled_by: RegistryEntryDisabler | None | UndefinedType = None, entity_category: EntityCategory | None | UndefinedType = None, hidden_by: RegistryEntryHider | None | UndefinedType = None, old_config_subentry_id: str | None = None, report_non_string_unique_id: bool = True, unique_id: str | Hashable | UndefinedType | Any, ) -> None: """Validate entity registry item.""" if unique_id is not UNDEFINED and not isinstance(unique_id, Hashable): raise TypeError(f"unique_id must be a string, got {unique_id}") if ( report_non_string_unique_id and unique_id is not UNDEFINED and not isinstance(unique_id, str) ): # In HA Core 2025.10, we should fail if unique_id is not a string report_issue = async_suggest_report_issue(hass, integration_domain=platform) _LOGGER.error( "'%s' from integration %s has a non string unique_id '%s', please %s", domain, platform, unique_id, report_issue, ) if config_entry_id and config_entry_id is not UNDEFINED: if not hass.config_entries.async_get_entry(config_entry_id): raise ValueError( f"Can't link entity to unknown config entry {config_entry_id}" ) if ( config_entry_id and config_entry_id is not UNDEFINED and old_config_subentry_id and config_subentry_id is UNDEFINED ): raise ValueError("Can't change config entry without changing subentry") if ( config_entry_id and config_entry_id is not UNDEFINED and config_subentry_id and config_subentry_id is not UNDEFINED ): if ( not (config_entry := hass.config_entries.async_get_entry(config_entry_id)) or config_subentry_id not in config_entry.subentries ): raise ValueError( f"Config entry {config_entry_id} has no subentry {config_subentry_id}" ) if device_id and device_id is not UNDEFINED: device_registry = dr.async_get(hass) if not device_registry.async_get(device_id): raise ValueError(f"Device {device_id} does not exist") if ( disabled_by and disabled_by is not UNDEFINED and not isinstance(disabled_by, RegistryEntryDisabler) ): raise ValueError( f"disabled_by must be a RegistryEntryDisabler value, got {disabled_by}" ) if ( entity_category and entity_category is not UNDEFINED and not isinstance(entity_category, EntityCategory) ): raise ValueError( f"entity_category must be a valid EntityCategory instance, got {entity_category}" ) if ( hidden_by and hidden_by is not UNDEFINED and not isinstance(hidden_by, RegistryEntryHider) ): raise ValueError( f"hidden_by must be a RegistryEntryHider value, got {hidden_by}" ) class EntityRegistry(BaseRegistry): """Class to hold a registry of entities.""" deleted_entities: dict[tuple[str, str, str], DeletedRegistryEntry] entities: EntityRegistryItems _entities_data: dict[str, RegistryEntry] def __init__(self, hass: HomeAssistant) -> None: """Initialize the registry.""" self.hass = hass self._store = EntityRegistryStore( hass, STORAGE_VERSION_MAJOR, STORAGE_KEY, atomic_writes=True, minor_version=STORAGE_VERSION_MINOR, serialize_in_event_loop=False, ) self.hass.bus.async_listen( EVENT_DEVICE_REGISTRY_UPDATED, self.async_device_modified, ) @callback def async_is_registered(self, entity_id: str) -> bool: """Check if an entity_id is currently registered.""" return entity_id in self.entities @callback def async_get(self, entity_id_or_uuid: str) -> RegistryEntry | None: """Get EntityEntry for an entity_id or entity entry id. We retrieve the RegistryEntry from the underlying dict to avoid the overhead of the UserDict __getitem__. """ return self._entities_data.get(entity_id_or_uuid) or self.entities.get_entry( entity_id_or_uuid ) @callback def async_get_entity_id( self, domain: str, platform: str, unique_id: str ) -> str | None: """Check if an entity_id is currently registered.""" return self.entities.get_entity_id((domain, platform, unique_id)) @callback def async_device_ids(self) -> list[str]: """Return known device ids.""" return list(self.entities.get_device_ids()) def _entity_id_available(self, entity_id: str) -> bool: """Return True if the entity_id is available. An entity_id is available if: - It's not registered - It's available (not in the state machine and not reserved) Note that an entity_id which belongs to a deleted entity is considered available. """ return entity_id not in self.entities and self.hass.states.async_available( entity_id ) @callback def async_generate_entity_id( self, domain: str, suggested_object_id: str, *, current_entity_id: str | None = None, reserved_entity_ids: set[str] | None = None, ) -> str: """Get available entity ID. This function is deprecated. Use `async_get_available_entity_id` instead. Entity ID conflicts are checked against registered and currently existing entities, as well as provided `reserved_entity_ids`. """ report_usage( "calls `entity_registry.async_generate_entity_id`, " "which is deprecated and will be removed in Home Assistant 2027.2; " "use `entity_registry.async_get_available_entity_id` instead", core_behavior=ReportBehavior.LOG, breaks_in_ha_version="2027.2.0", ) return self.async_get_available_entity_id( domain, suggested_object_id, current_entity_id=current_entity_id, reserved_entity_ids=reserved_entity_ids, ) @callback def async_get_available_entity_id( self, domain: str, suggested_object_id: str, *, current_entity_id: str | None = None, reserved_entity_ids: set[str] | None = None, ) -> str: """Get next available entity ID. Entity ID conflicts are checked against registered and currently existing entities, as well as provided `reserved_entity_ids`. """ preferred_string = f"{domain}.{slugify(suggested_object_id)}" if len(domain) > MAX_LENGTH_STATE_DOMAIN: raise MaxLengthExceeded(domain, "domain", MAX_LENGTH_STATE_DOMAIN) test_string = preferred_string[:MAX_LENGTH_STATE_ENTITY_ID] tries = 1 while ( not self._entity_id_available(test_string) and test_string != current_entity_id ) or (reserved_entity_ids and test_string in reserved_entity_ids): tries += 1 len_suffix = len(str(tries)) + 1 test_string = ( f"{preferred_string[: MAX_LENGTH_STATE_ENTITY_ID - len_suffix]}_{tries}" ) return test_string def _async_generate_entity_id( self, *, current_entity_id: str | None, device_id: str | None, domain: str, has_entity_name: bool, name: str | None, object_id_base: str | None, platform: str, reserved_entity_ids: set[str] | None = None, suggested_object_id: str | None, unique_id: str, ) -> str: """Generate an entity ID, based on all the provided parameters. `name` is the name set by the user, not the original name from the integration. `name` has priority over `suggested_object_id`, which has priority over `object_id_base`. `name` will always be prefixed with the device name. `suggested_object_id` will not be prefixed with the device name. `object_id_base` will be prefixed with the device name if `has_entity_name` is True. Entity ID conflicts are checked against registered and currently existing entities, as well as provided `reserved_entity_ids`. """ object_id = _async_get_full_entity_name( self.hass, device_id=device_id, fallback=f"{platform}_{unique_id}", has_entity_name=has_entity_name, name=name, original_name=object_id_base, overridden_name=suggested_object_id, unprefix_name=True, ) return self.async_get_available_entity_id( domain, object_id, current_entity_id=current_entity_id, reserved_entity_ids=reserved_entity_ids, ) @callback def async_regenerate_entity_id( self, entry: RegistryEntry, *, reserved_entity_ids: set[str] | None = None, ) -> str: """Regenerate an entity ID for an entry. Entity ID conflicts are checked against registered and currently existing entities, as well as provided `reserved_entity_ids`. """ return self._async_generate_entity_id( current_entity_id=entry.entity_id, device_id=entry.device_id, domain=entry.domain, has_entity_name=entry.has_entity_name, name=entry.name, object_id_base=entry.object_id_base, platform=entry.platform, reserved_entity_ids=reserved_entity_ids, suggested_object_id=entry.suggested_object_id, unique_id=entry.unique_id, ) @callback def async_get_or_create( self, domain: str, platform: str, unique_id: str, *, # Used for entity ID generation, if entity gets created. # `suggested_object_id` has priority over `object_id_base`. object_id_base: str | None | UndefinedType = UNDEFINED, suggested_object_id: str | None | UndefinedType = UNDEFINED, # To disable or hide an entity if it gets created, does not affect # existing entities disabled_by: RegistryEntryDisabler | None = None, hidden_by: RegistryEntryHider | None = None, # Function to generate initial entity options if it gets created get_initial_options: Callable[[], EntityOptionsType | None] | None = None, # Data that we want entry to have capabilities: Mapping[str, Any] | None | UndefinedType = UNDEFINED, config_entry: ConfigEntry | None | UndefinedType = UNDEFINED, config_subentry_id: str | None | UndefinedType = UNDEFINED, device_id: str | None | UndefinedType = UNDEFINED, entity_category: EntityCategory | UndefinedType | None = UNDEFINED, has_entity_name: bool | UndefinedType = UNDEFINED, original_device_class: str | None | UndefinedType = UNDEFINED, original_icon: str | None | UndefinedType = UNDEFINED, original_name: str | None | UndefinedType = UNDEFINED, supported_features: int | None | UndefinedType = UNDEFINED, translation_key: str | None | UndefinedType = UNDEFINED, unit_of_measurement: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: """Get entity. Create if it doesn't exist.""" config_entry_id: str | None | UndefinedType = UNDEFINED if not config_entry: config_entry_id = None elif config_entry is not UNDEFINED: config_entry_id = config_entry.entry_id supported_features = supported_features or 0 entity_id = self.async_get_entity_id(domain, platform, unique_id) if entity_id: return self._async_update_entity( entity_id, capabilities=capabilities, config_entry_id=config_entry_id, config_subentry_id=config_subentry_id, device_id=device_id, entity_category=entity_category, has_entity_name=has_entity_name, object_id_base=object_id_base, original_device_class=original_device_class, original_icon=original_icon, original_name=original_name, suggested_object_id=suggested_object_id, supported_features=supported_features, translation_key=translation_key, unit_of_measurement=unit_of_measurement, ) self.hass.verify_event_loop_thread("entity_registry.async_get_or_create") _validate_item( self.hass, domain, platform, config_entry_id=config_entry_id, config_subentry_id=config_subentry_id, device_id=device_id, disabled_by=disabled_by, entity_category=entity_category, hidden_by=hidden_by, unique_id=unique_id, ) entity_registry_id: str | None = None created_at = utcnow() deleted_entity = self.deleted_entities.pop((domain, platform, unique_id), None) options: Mapping[str, Mapping[str, Any]] | None if deleted_entity is not None: aliases = deleted_entity.aliases area_id = deleted_entity.area_id categories = deleted_entity.categories compat_aliases = deleted_entity.compat_aliases created_at = deleted_entity.created_at device_class = deleted_entity.device_class if deleted_entity.disabled_by is not UNDEFINED: disabled_by = deleted_entity.disabled_by # Adjust disabled_by based on config entry state if config_entry and config_entry is not UNDEFINED: if config_entry.disabled_by: if disabled_by is None: disabled_by = RegistryEntryDisabler.CONFIG_ENTRY elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: disabled_by = None elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: disabled_by = None # Restore entity_id if it's available if self._entity_id_available(deleted_entity.entity_id): entity_id = deleted_entity.entity_id entity_registry_id = deleted_entity.id if deleted_entity.hidden_by is not UNDEFINED: hidden_by = deleted_entity.hidden_by icon = deleted_entity.icon labels = deleted_entity.labels name = deleted_entity.name if deleted_entity.options is not UNDEFINED: options = deleted_entity.options else: options = get_initial_options() if get_initial_options else None else: aliases = [COMPUTED_NAME] area_id = None categories = {} compat_aliases = [] device_class = None icon = None labels = set() name = None options = get_initial_options() if get_initial_options else None def none_if_undefined[_T](value: _T | UndefinedType) -> _T | None: """Return None if value is UNDEFINED, otherwise return value.""" return None if value is UNDEFINED else value device_id = none_if_undefined(device_id) has_entity_name_bool = none_if_undefined(has_entity_name) or False object_id_base = none_if_undefined(object_id_base) suggested_object_id = none_if_undefined(suggested_object_id) if entity_id is None: entity_id = self._async_generate_entity_id( current_entity_id=None, device_id=device_id, domain=domain, has_entity_name=has_entity_name_bool, name=name, object_id_base=object_id_base, platform=platform, suggested_object_id=suggested_object_id, unique_id=unique_id, ) original_name = none_if_undefined(original_name) original_name_unprefixed = _unprefix_original_name( self.hass, original_name, has_entity_name_bool, device_id ) if ( disabled_by is None and config_entry and config_entry is not UNDEFINED and config_entry.pref_disable_new_entities ): disabled_by = RegistryEntryDisabler.INTEGRATION entry = RegistryEntry( aliases=aliases, area_id=area_id, categories=categories, capabilities=none_if_undefined(capabilities), compat_aliases=compat_aliases, config_entry_id=none_if_undefined(config_entry_id), config_subentry_id=none_if_undefined(config_subentry_id), created_at=created_at, device_class=device_class, device_id=device_id, disabled_by=disabled_by, entity_category=none_if_undefined(entity_category), entity_id=entity_id, hidden_by=hidden_by, has_entity_name=has_entity_name_bool, icon=icon, id=entity_registry_id, labels=labels, name=name, object_id_base=object_id_base, options=options, original_device_class=none_if_undefined(original_device_class), original_icon=none_if_undefined(original_icon), original_name=original_name, original_name_unprefixed=original_name_unprefixed, platform=platform, suggested_object_id=suggested_object_id, supported_features=none_if_undefined(supported_features) or 0, translation_key=none_if_undefined(translation_key), unique_id=unique_id, unit_of_measurement=none_if_undefined(unit_of_measurement), ) self.entities[entity_id] = entry _LOGGER.info("Registered new %s.%s entity: %s", domain, platform, entity_id) self.async_schedule_save() self.hass.bus.async_fire_internal( EVENT_ENTITY_REGISTRY_UPDATED, _EventEntityRegistryUpdatedData_CreateRemove( action="create", entity_id=entity_id ), ) return entry @callback def async_remove(self, entity_id: str) -> None: """Remove an entity from registry.""" self.hass.verify_event_loop_thread("entity_registry.async_remove") if entity_id not in self.entities: # Allow attempts to remove an entity which does not exist. If this is # not allowed, there will be races during cleanup where we iterate over # lists of entities to remove, but there are listeners for entity # registry events which delete entities at the same time. # For example, if we clean up entities A and B, there might be a listener # which deletes entity B when entity A is being removed. return entity = self.entities.pop(entity_id) config_entry_id = entity.config_entry_id key = (entity.domain, entity.platform, entity.unique_id) # If the entity does not belong to a config entry, mark it as orphaned orphaned_timestamp = None if config_entry_id else time.time() self.deleted_entities[key] = DeletedRegistryEntry( aliases=entity.aliases, area_id=entity.area_id, categories=entity.categories, compat_aliases=entity.compat_aliases, config_entry_id=config_entry_id, config_subentry_id=entity.config_subentry_id, created_at=entity.created_at, device_class=entity.device_class, disabled_by=entity.disabled_by, entity_id=entity_id, hidden_by=entity.hidden_by, icon=entity.icon, id=entity.id, labels=entity.labels, modified_at=utcnow(), name=entity.name, options=entity.options, orphaned_timestamp=orphaned_timestamp, platform=entity.platform, unique_id=entity.unique_id, ) self.hass.bus.async_fire_internal( EVENT_ENTITY_REGISTRY_UPDATED, _EventEntityRegistryUpdatedData_CreateRemove( action="remove", entity_id=entity_id ), ) self.async_schedule_save() @callback def async_device_modified( self, event: Event[EventDeviceRegistryUpdatedData] ) -> None: """Handle the removal or update of a device. Remove entities from the registry that are associated to a device when the device is removed. Disable entities in the registry that are associated to a device when the device is disabled. """ if event.data["action"] == "remove": entities = async_entries_for_device( self, event.data["device_id"], include_disabled_entities=True ) removed_device_dict = event.data["device"] for entity in entities: config_entry_id = entity.config_entry_id if ( config_entry_id in removed_device_dict["config_entries"] and entity.config_subentry_id in removed_device_dict["config_entries_subentries"][config_entry_id] ): self.async_remove(entity.entity_id) else: if entity.entity_id not in self.entities: # Entity has been removed already, skip it continue self.async_update_entity(entity.entity_id, device_id=None) return if event.data["action"] != "update": # Ignore "create" action return device_registry = dr.async_get(self.hass) device = device_registry.async_get(event.data["device_id"]) # The device may be deleted already if the event handling is late, do nothing # in that case. Entities will be removed when we get the "remove" event. if not device: return changes = event.data["changes"] # Remove entities which belong to config entries no longer associated with the # device if old_config_entries := changes.get("config_entries"): entities = async_entries_for_device( self, event.data["device_id"], include_disabled_entities=True ) for entity in entities: config_entry_id = entity.config_entry_id if ( entity.config_entry_id in old_config_entries and entity.config_entry_id not in device.config_entries ): self.async_remove(entity.entity_id) # Remove entities which belong to config subentries no longer associated with the # device if old_config_entries_subentries := changes.get("config_entries_subentries"): entities = async_entries_for_device( self, event.data["device_id"], include_disabled_entities=True ) for entity in entities: config_entry_id = entity.config_entry_id config_subentry_id = entity.config_subentry_id if ( config_entry_id in device.config_entries and config_entry_id in old_config_entries_subentries and config_subentry_id in old_config_entries_subentries[config_entry_id] and config_subentry_id not in device.config_entries_subentries[config_entry_id] ): self.async_remove(entity.entity_id) # Update name if device name changed if (by_user := "name_by_user" in changes) or "name" in changes: entities = async_entries_for_device( self, event.data["device_id"], include_disabled_entities=True ) device_name = device.name_by_user or device.name for entity in entities: if entity.has_entity_name: continue # When a user renames a device, update entity names to reflect # the new device name. # An empty name_unprefixed means the entity name equals # the device name (e.g. a main sensor); a non-empty one # is appended as a suffix. name: str | None | UndefinedType = UNDEFINED if ( by_user and entity.name is None and (name_unprefixed := entity.original_name_unprefixed) is not None ): if not name_unprefixed: name = device_name elif device_name: name = f"{device_name} {name_unprefixed}" original_name_unprefixed = _async_strip_prefix_from_entity_name( entity.original_name, device_name ) self._async_update_entity( entity.entity_id, name=name, original_name_unprefixed=original_name_unprefixed, ) # Re-enable disabled entities if the device is no longer disabled if not device.disabled: entities = async_entries_for_device( self, event.data["device_id"], include_disabled_entities=True ) for entity in entities: if entity.disabled_by is not RegistryEntryDisabler.DEVICE: continue self.async_update_entity(entity.entity_id, disabled_by=None) return # Ignore device disabled by config entry, this is handled by # async_config_entry_disabled_by_changed if device.disabled_by is dr.DeviceEntryDisabler.CONFIG_ENTRY: return # Fetch entities which are not already disabled and disable them entities = async_entries_for_device(self, event.data["device_id"]) for entity in entities: self.async_update_entity( entity.entity_id, disabled_by=RegistryEntryDisabler.DEVICE ) @callback def _async_update_entity( self, entity_id: str, *, aliases: list[AliasEntry] | UndefinedType = UNDEFINED, area_id: str | None | UndefinedType = UNDEFINED, categories: dict[str, str] | UndefinedType = UNDEFINED, capabilities: Mapping[str, Any] | None | UndefinedType = UNDEFINED, config_entry_id: str | None | UndefinedType = UNDEFINED, config_subentry_id: str | None | UndefinedType = UNDEFINED, device_class: str | None | UndefinedType = UNDEFINED, device_id: str | None | UndefinedType = UNDEFINED, disabled_by: RegistryEntryDisabler | None | UndefinedType = UNDEFINED, entity_category: EntityCategory | None | UndefinedType = UNDEFINED, hidden_by: RegistryEntryHider | None | UndefinedType = UNDEFINED, icon: str | None | UndefinedType = UNDEFINED, has_entity_name: bool | UndefinedType = UNDEFINED, labels: set[str] | UndefinedType = UNDEFINED, name: str | None | UndefinedType = UNDEFINED, new_entity_id: str | UndefinedType = UNDEFINED, new_unique_id: str | UndefinedType = UNDEFINED, object_id_base: str | None | UndefinedType = UNDEFINED, options: EntityOptionsType | UndefinedType = UNDEFINED, original_device_class: str | None | UndefinedType = UNDEFINED, original_icon: str | None | UndefinedType = UNDEFINED, original_name: str | None | UndefinedType = UNDEFINED, original_name_unprefixed: str | None | UndefinedType = UNDEFINED, platform: str | None | UndefinedType = UNDEFINED, suggested_object_id: str | None | UndefinedType = UNDEFINED, supported_features: int | UndefinedType = UNDEFINED, translation_key: str | None | UndefinedType = UNDEFINED, unit_of_measurement: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: """Private facing update properties method.""" old = self.entities[entity_id] new_values: dict[str, Any] = {} # Dict with new key/value pairs old_values: dict[str, Any] = {} # Dict with old key/value pairs for attr_name, value in ( ("aliases", aliases), ("area_id", area_id), ("categories", categories), ("capabilities", capabilities), ("config_entry_id", config_entry_id), ("config_subentry_id", config_subentry_id), ("device_class", device_class), ("device_id", device_id), ("disabled_by", disabled_by), ("entity_category", entity_category), ("hidden_by", hidden_by), ("icon", icon), ("has_entity_name", has_entity_name), ("labels", labels), ("name", name), ("object_id_base", object_id_base), ("options", options), ("original_device_class", original_device_class), ("original_icon", original_icon), ("original_name", original_name), ("original_name_unprefixed", original_name_unprefixed), ("platform", platform), ("suggested_object_id", suggested_object_id), ("supported_features", supported_features), ("translation_key", translation_key), ("unit_of_measurement", unit_of_measurement), ): if value is not UNDEFINED and value != getattr(old, attr_name): new_values[attr_name] = value old_values[attr_name] = getattr(old, attr_name) # Only validate if data has changed if new_values or new_unique_id is not UNDEFINED: _validate_item( self.hass, old.domain, old.platform, config_entry_id=config_entry_id, config_subentry_id=config_subentry_id, device_id=device_id, disabled_by=disabled_by, entity_category=entity_category, hidden_by=hidden_by, old_config_subentry_id=old.config_subentry_id, unique_id=new_unique_id, ) if disabled_by is UNDEFINED and config_entry_id is not UNDEFINED: if config_entry_id: config_entry = self.hass.config_entries.async_get_entry(config_entry_id) if TYPE_CHECKING: # We've checked the config_entry exists in _validate_item assert config_entry is not None if config_entry.disabled_by: if old.disabled_by is None: new_values["disabled_by"] = RegistryEntryDisabler.CONFIG_ENTRY elif old.disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: new_values["disabled_by"] = None elif old.disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: new_values["disabled_by"] = None if new_entity_id is not UNDEFINED and new_entity_id != old.entity_id: if not self._entity_id_available(new_entity_id): raise ValueError("Entity with this ID is already registered") if not valid_entity_id(new_entity_id): raise ValueError("Invalid entity ID") if split_entity_id(new_entity_id)[0] != split_entity_id(entity_id)[0]: raise ValueError("New entity ID should be same domain") self.entities.pop(entity_id) entity_id = new_values["entity_id"] = new_entity_id old_values["entity_id"] = old.entity_id if new_unique_id is not UNDEFINED: conflict_entity_id = self.async_get_entity_id( old.domain, old.platform, new_unique_id ) if conflict_entity_id: raise ValueError( f"Unique id '{new_unique_id}' is already in use by " f"'{conflict_entity_id}'" ) new_values["unique_id"] = new_unique_id old_values["unique_id"] = old.unique_id new_values["previous_unique_id"] = old.unique_id if not new_values: return old new_values["modified_at"] = utcnow() self.hass.verify_event_loop_thread("entity_registry.async_update_entity") if original_name_unprefixed is UNDEFINED and ( original_name is not UNDEFINED or device_id is not UNDEFINED or has_entity_name is not UNDEFINED ): device_id = device_id if device_id is not UNDEFINED else old.device_id has_entity_name = ( has_entity_name if has_entity_name is not UNDEFINED else old.has_entity_name ) original_name = ( original_name if original_name is not UNDEFINED else old.original_name ) original_name_unprefixed = _unprefix_original_name( self.hass, original_name, has_entity_name, device_id ) new_values["original_name_unprefixed"] = original_name_unprefixed new = self.entities[entity_id] = attr.evolve(old, **new_values) self.async_schedule_save() old_values.pop("original_name_unprefixed", None) data: _EventEntityRegistryUpdatedData_Update = { "action": "update", "entity_id": entity_id, "changes": old_values, } if old.entity_id != entity_id: data["old_entity_id"] = old.entity_id self.hass.bus.async_fire_internal(EVENT_ENTITY_REGISTRY_UPDATED, data) return new @callback def async_update_entity( self, entity_id: str, *, aliases: list[AliasEntry] | UndefinedType = UNDEFINED, area_id: str | None | UndefinedType = UNDEFINED, categories: dict[str, str] | UndefinedType = UNDEFINED, capabilities: Mapping[str, Any] | None | UndefinedType = UNDEFINED, config_entry_id: str | None | UndefinedType = UNDEFINED, config_subentry_id: str | None | UndefinedType = UNDEFINED, device_class: str | None | UndefinedType = UNDEFINED, device_id: str | None | UndefinedType = UNDEFINED, disabled_by: RegistryEntryDisabler | None | UndefinedType = UNDEFINED, entity_category: EntityCategory | None | UndefinedType = UNDEFINED, hidden_by: RegistryEntryHider | None | UndefinedType = UNDEFINED, icon: str | None | UndefinedType = UNDEFINED, has_entity_name: bool | UndefinedType = UNDEFINED, labels: set[str] | UndefinedType = UNDEFINED, name: str | None | UndefinedType = UNDEFINED, new_entity_id: str | UndefinedType = UNDEFINED, new_unique_id: str | UndefinedType = UNDEFINED, original_device_class: str | None | UndefinedType = UNDEFINED, original_icon: str | None | UndefinedType = UNDEFINED, original_name: str | None | UndefinedType = UNDEFINED, supported_features: int | UndefinedType = UNDEFINED, translation_key: str | None | UndefinedType = UNDEFINED, unit_of_measurement: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: """Update properties of an entity.""" return self._async_update_entity( entity_id, aliases=aliases, area_id=area_id, categories=categories, capabilities=capabilities, config_entry_id=config_entry_id, config_subentry_id=config_subentry_id, device_class=device_class, device_id=device_id, disabled_by=disabled_by, entity_category=entity_category, hidden_by=hidden_by, icon=icon, has_entity_name=has_entity_name, labels=labels, name=name, new_entity_id=new_entity_id, new_unique_id=new_unique_id, original_device_class=original_device_class, original_icon=original_icon, original_name=original_name, supported_features=supported_features, translation_key=translation_key, unit_of_measurement=unit_of_measurement, ) @callback def async_update_entity_platform( self, entity_id: str, new_platform: str, *, new_config_entry_id: str | UndefinedType = UNDEFINED, new_config_subentry_id: str | UndefinedType = UNDEFINED, new_unique_id: str | UndefinedType = UNDEFINED, new_device_id: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: """Update entity platform. This should only be used when an entity needs to be migrated between integrations. """ if ( state := self.hass.states.get(entity_id) ) is not None and state.state != STATE_UNKNOWN: raise ValueError("Only entities that haven't been loaded can be migrated") old = self.entities[entity_id] if new_config_entry_id == UNDEFINED and old.config_entry_id is not None: raise ValueError( f"new_config_entry_id required because {entity_id} is already linked " "to a config entry" ) return self._async_update_entity( entity_id, new_unique_id=new_unique_id, config_entry_id=new_config_entry_id, config_subentry_id=new_config_subentry_id, device_id=new_device_id, platform=new_platform, ) @callback def async_update_entity_options( self, entity_id: str, domain: str, options: Mapping[str, Any] | None ) -> RegistryEntry: """Update entity options for a domain. If the domain options are set to None, they will be removed. """ old = self.entities[entity_id] new_options: dict[str, Mapping] = { key: value for key, value in old.options.items() if key != domain } if options is not None: new_options[domain] = options return self._async_update_entity(entity_id, options=new_options) async def _async_load(self) -> None: """Load the entity registry.""" # Device registry must be loaded before entity registry because # migration and entity processing reference device names. await dr.async_get(self.hass).async_wait_loaded() _async_setup_cleanup(self.hass, self) _async_setup_entity_restore(self.hass, self) data = await self._store.async_load() entities = EntityRegistryItems() deleted_entities: dict[tuple[str, str, str], DeletedRegistryEntry] = {} if data is not None: for entity in data["entities"]: try: domain = split_entity_id(entity["entity_id"])[0] _validate_item( self.hass, domain, entity["platform"], report_non_string_unique_id=False, unique_id=entity["unique_id"], ) except (TypeError, ValueError) as err: report_issue = async_suggest_report_issue( self.hass, integration_domain=entity["platform"] ) _LOGGER.error( ( "Entity registry entry '%s' from integration %s could not " "be loaded: '%s', please %s" ), entity["entity_id"], entity["platform"], str(err), report_issue, ) continue original_name_unprefixed = _unprefix_original_name( self.hass, entity["original_name"], entity["has_entity_name"], entity["device_id"], ) entities[entity["entity_id"]] = RegistryEntry( aliases=_deserialize_aliases(entity["aliases_v2"]), area_id=entity["area_id"], categories=entity["categories"], capabilities=entity["capabilities"], compat_aliases=entity["aliases"], config_entry_id=entity["config_entry_id"], config_subentry_id=entity["config_subentry_id"], created_at=datetime.fromisoformat(entity["created_at"]), device_class=entity["device_class"], device_id=entity["device_id"], disabled_by=RegistryEntryDisabler(entity["disabled_by"]) if entity["disabled_by"] else None, entity_category=EntityCategory(entity["entity_category"]) if entity["entity_category"] else None, entity_id=entity["entity_id"], hidden_by=RegistryEntryHider(entity["hidden_by"]) if entity["hidden_by"] else None, icon=entity["icon"], id=entity["id"], has_entity_name=entity["has_entity_name"], labels=set(entity["labels"]), modified_at=datetime.fromisoformat(entity["modified_at"]), name=entity["name"], object_id_base=entity.get("object_id_base"), options=entity["options"], original_device_class=entity["original_device_class"], original_icon=entity["original_icon"], original_name=entity["original_name"], original_name_unprefixed=original_name_unprefixed, platform=entity["platform"], suggested_object_id=entity["suggested_object_id"], supported_features=entity["supported_features"], translation_key=entity["translation_key"], unique_id=entity["unique_id"], previous_unique_id=entity["previous_unique_id"], unit_of_measurement=entity["unit_of_measurement"], ) def get_optional_enum[_EnumT: StrEnum]( cls: type[_EnumT], value: str | None, undefined: bool ) -> _EnumT | UndefinedType | None: """Convert string to the passed enum, UNDEFINED or None.""" if undefined: return UNDEFINED if value is None: return None try: return cls(value) except ValueError: return None for entity in data["deleted_entities"]: try: domain = split_entity_id(entity["entity_id"])[0] _validate_item( self.hass, domain, entity["platform"], report_non_string_unique_id=False, unique_id=entity["unique_id"], ) except TypeError, ValueError: continue key = ( split_entity_id(entity["entity_id"])[0], entity["platform"], entity["unique_id"], ) deleted_entities[key] = DeletedRegistryEntry( aliases=_deserialize_aliases(entity["aliases_v2"]), area_id=entity["area_id"], categories=entity["categories"], compat_aliases=entity["aliases"], config_entry_id=entity["config_entry_id"], config_subentry_id=entity["config_subentry_id"], created_at=datetime.fromisoformat(entity["created_at"]), device_class=entity["device_class"], disabled_by=get_optional_enum( RegistryEntryDisabler, entity["disabled_by"], entity["disabled_by_undefined"], ), entity_id=entity["entity_id"], hidden_by=get_optional_enum( RegistryEntryHider, entity["hidden_by"], entity["hidden_by_undefined"], ), icon=entity["icon"], id=entity["id"], labels=set(entity["labels"]), modified_at=datetime.fromisoformat(entity["modified_at"]), name=entity["name"], options=entity["options"] if not entity["options_undefined"] else UNDEFINED, orphaned_timestamp=entity["orphaned_timestamp"], platform=entity["platform"], unique_id=entity["unique_id"], ) self.deleted_entities = deleted_entities self.entities = entities self._entities_data = entities.data def _data_to_save(self) -> dict[str, Any]: """Return data of entity registry to store in a file.""" # Create intermediate lists to allow this method to be called from a thread # other than the event loop. return { "entities": [ entry.as_storage_fragment for entry in list(self.entities.values()) ], "deleted_entities": [ entry.as_storage_fragment for entry in list(self.deleted_entities.values()) ], } @callback def async_clear_category_id(self, scope: str, category_id: str) -> None: """Clear category id from registry entries.""" for entity_id, entry in self.entities.items(): if ( existing_category_id := entry.categories.get(scope) ) and category_id == existing_category_id: categories = entry.categories.copy() del categories[scope] self.async_update_entity(entity_id, categories=categories) for key, deleted_entity in list(self.deleted_entities.items()): if ( existing_category_id := deleted_entity.categories.get(scope) ) and category_id == existing_category_id: categories = deleted_entity.categories.copy() del categories[scope] self.deleted_entities[key] = attr.evolve( deleted_entity, categories=categories ) self.async_schedule_save() @callback def async_clear_label_id(self, label_id: str) -> None: """Clear label from registry entries.""" for entry in self.entities.get_entries_for_label(label_id): self.async_update_entity(entry.entity_id, labels=entry.labels - {label_id}) for key, deleted_entity in list(self.deleted_entities.items()): if label_id not in deleted_entity.labels: continue self.deleted_entities[key] = attr.evolve( deleted_entity, labels=deleted_entity.labels - {label_id} ) self.async_schedule_save() @callback def async_clear_config_entry(self, config_entry_id: str) -> None: """Clear config entry from registry entries.""" now_time = time.time() for entity_id in [ entry.entity_id for entry in self.entities.get_entries_for_config_entry_id(config_entry_id) ]: self.async_remove(entity_id) for key, deleted_entity in list(self.deleted_entities.items()): if config_entry_id != deleted_entity.config_entry_id: continue # Add a time stamp when the deleted entity became orphaned self.deleted_entities[key] = attr.evolve( deleted_entity, orphaned_timestamp=now_time, config_entry_id=None ) self.async_schedule_save() @callback def async_clear_config_subentry( self, config_entry_id: str, config_subentry_id: str ) -> None: """Clear config subentry from registry entries.""" now_time = time.time() for entity_id in [ entry.entity_id for entry in self.entities.get_entries_for_config_entry_id(config_entry_id) if entry.config_subentry_id == config_subentry_id ]: self.async_remove(entity_id) for key, deleted_entity in list(self.deleted_entities.items()): if config_subentry_id != deleted_entity.config_subentry_id: continue # Add a time stamp when the deleted entity became orphaned self.deleted_entities[key] = attr.evolve( deleted_entity, orphaned_timestamp=now_time, config_entry_id=None, config_subentry_id=None, ) self.async_schedule_save() @callback def async_purge_expired_orphaned_entities(self) -> None: """Purge expired orphaned entities from the registry. We need to purge these periodically to avoid the database growing without bound. """ now_time = time.time() for key, deleted_entity in list(self.deleted_entities.items()): if (orphaned_timestamp := deleted_entity.orphaned_timestamp) is None: continue if orphaned_timestamp + ORPHANED_ENTITY_KEEP_SECONDS < now_time: self.deleted_entities.pop(key) self.async_schedule_save() @callback def async_clear_area_id(self, area_id: str) -> None: """Clear area id from registry entries.""" for entry in self.entities.get_entries_for_area_id(area_id): self.async_update_entity(entry.entity_id, area_id=None) for key, deleted_entity in list(self.deleted_entities.items()): if deleted_entity.area_id != area_id: continue self.deleted_entities[key] = attr.evolve(deleted_entity, area_id=None) self.async_schedule_save() @callback @singleton(DATA_REGISTRY) def async_get(hass: HomeAssistant) -> EntityRegistry: """Get entity registry.""" return EntityRegistry(hass) async def async_load(hass: HomeAssistant, *, load_empty: bool = False) -> None: """Load entity registry.""" assert DATA_REGISTRY not in hass.data await async_get(hass).async_load(load_empty=load_empty) @callback def async_entries_for_device( registry: EntityRegistry, device_id: str, include_disabled_entities: bool = False ) -> list[RegistryEntry]: """Return entries that match a device.""" return registry.entities.get_entries_for_device_id( device_id, include_disabled_entities ) @callback def async_entries_for_area( registry: EntityRegistry, area_id: str ) -> list[RegistryEntry]: """Return entries that match an area.""" return registry.entities.get_entries_for_area_id(area_id) @callback def async_entries_for_label( registry: EntityRegistry, label_id: str ) -> list[RegistryEntry]: """Return entries that match a label.""" return registry.entities.get_entries_for_label(label_id) @callback def async_entries_for_category( registry: EntityRegistry, scope: str, category_id: str ) -> list[RegistryEntry]: """Return entries that match a category in a scope.""" return [ entry for entry in registry.entities.values() if ( (existing_category_id := entry.categories.get(scope)) and category_id == existing_category_id ) ] @callback def async_entries_for_config_entry( registry: EntityRegistry, config_entry_id: str ) -> list[RegistryEntry]: """Return entries that match a config entry.""" return registry.entities.get_entries_for_config_entry_id(config_entry_id) @callback def async_config_entry_disabled_by_changed( registry: EntityRegistry, config_entry: ConfigEntry ) -> None: """Handle a config entry being disabled or enabled. Disable entities in the registry that are associated with a config entry when the config entry is disabled, enable entities in the registry that are associated with a config entry when the config entry is enabled and the entities are marked DISABLED_CONFIG_ENTRY. """ entities = async_entries_for_config_entry(registry, config_entry.entry_id) if not config_entry.disabled_by: for entity in entities: if entity.disabled_by is not RegistryEntryDisabler.CONFIG_ENTRY: continue registry.async_update_entity(entity.entity_id, disabled_by=None) return for entity in entities: if entity.disabled: # Entity already disabled, do not overwrite continue registry.async_update_entity( entity.entity_id, disabled_by=RegistryEntryDisabler.CONFIG_ENTRY ) @callback def _async_setup_cleanup(hass: HomeAssistant, registry: EntityRegistry) -> None: """Clean up device registry when entities removed.""" from . import category_registry as cr, event, label_registry as lr # noqa: PLC0415 @callback def _removed_from_registry_filter( event_data: lr.EventLabelRegistryUpdatedData | cr.EventCategoryRegistryUpdatedData, ) -> bool: """Filter all except for the remove action from registry events.""" return event_data["action"] == "remove" @callback def _handle_label_registry_update(event: lr.EventLabelRegistryUpdated) -> None: """Update entity that have a label that has been removed.""" registry.async_clear_label_id(event.data["label_id"]) hass.bus.async_listen( event_type=lr.EVENT_LABEL_REGISTRY_UPDATED, event_filter=_removed_from_registry_filter, listener=_handle_label_registry_update, ) @callback def _handle_category_registry_update( event: cr.EventCategoryRegistryUpdated, ) -> None: """Update entity that have a category that has been removed.""" registry.async_clear_category_id(event.data["scope"], event.data["category_id"]) hass.bus.async_listen( event_type=cr.EVENT_CATEGORY_REGISTRY_UPDATED, event_filter=_removed_from_registry_filter, listener=_handle_category_registry_update, ) @callback def cleanup(_: datetime) -> None: """Clean up entity registry.""" # Periodic purge of orphaned entities to avoid the registry # growing without bounds when there are lots of deleted entities registry.async_purge_expired_orphaned_entities() cancel = event.async_track_time_interval( hass, cleanup, timedelta(seconds=CLEANUP_INTERVAL) ) @callback def _on_homeassistant_stop(event: Event) -> None: """Cancel cleanup.""" cancel() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _on_homeassistant_stop) @callback def _async_setup_entity_restore(hass: HomeAssistant, registry: EntityRegistry) -> None: """Set up the entity restore mechanism.""" @callback def cleanup_restored_states_filter(event_data: Mapping[str, Any]) -> bool: """Clean up restored states filter.""" return (event_data["action"] == "remove") or ( event_data["action"] == "update" and "old_entity_id" in event_data and event_data["entity_id"] != event_data["old_entity_id"] ) @callback def cleanup_restored_states(event: Event[EventEntityRegistryUpdatedData]) -> None: """Clean up restored states.""" if event.data["action"] == "update": old_entity_id = event.data["old_entity_id"] old_state = hass.states.get(old_entity_id) if old_state is None or not old_state.attributes.get(ATTR_RESTORED): return hass.states.async_remove(old_entity_id, context=event.context) if entry := registry.async_get(event.data["entity_id"]): entry.write_unavailable_state(hass) return state = hass.states.get(event.data["entity_id"]) if state is None or not state.attributes.get(ATTR_RESTORED): return hass.states.async_remove(event.data["entity_id"], context=event.context) hass.bus.async_listen( EVENT_ENTITY_REGISTRY_UPDATED, cleanup_restored_states, event_filter=cleanup_restored_states_filter, ) if hass.is_running: return @callback def _write_unavailable_states(_: Event) -> None: """Make sure state machine contains entry for each registered entity.""" existing = set(hass.states.async_entity_ids()) for entry in registry.entities.values(): if entry.entity_id in existing or entry.disabled: continue entry.write_unavailable_state(hass) hass.bus.async_listen(EVENT_HOMEASSISTANT_START, _write_unavailable_states) async def async_migrate_entries( hass: HomeAssistant, config_entry_id: str, entry_callback: Callable[[RegistryEntry], dict[str, Any] | None], ) -> None: """Migrate entity registry entries which belong to a config entry. Can be used as a migrator of unique_ids or to update other entity registry data. Can also be used to remove duplicated entity registry entries. """ ent_reg = async_get(hass) entities = ent_reg.entities for entry in entities.get_entries_for_config_entry_id(config_entry_id): if ( entities.get_entry(entry.id) and (updates := entry_callback(entry)) is not None ): ent_reg.async_update_entity(entry.entity_id, **updates) @callback def async_validate_entity_id(registry: EntityRegistry, entity_id_or_uuid: str) -> str: """Validate and resolve an entity id or UUID to an entity id. Raises vol.Invalid if the entity or UUID is invalid, or if the UUID is not associated with an entity registry item. """ if valid_entity_id(entity_id_or_uuid): return entity_id_or_uuid if (entry := registry.entities.get_entry(entity_id_or_uuid)) is None: raise vol.Invalid(f"Unknown entity registry entry {entity_id_or_uuid}") return entry.entity_id @callback def async_resolve_entity_id( registry: EntityRegistry, entity_id_or_uuid: str ) -> str | None: """Validate and resolve an entity id or UUID to an entity id. Returns None if the entity or UUID is invalid, or if the UUID is not associated with an entity registry item. """ if valid_entity_id(entity_id_or_uuid): return entity_id_or_uuid if (entry := registry.entities.get_entry(entity_id_or_uuid)) is None: return None return entry.entity_id @callback def async_validate_entity_ids( registry: EntityRegistry, entity_ids_or_uuids: list[str] ) -> list[str]: """Validate and resolve a list of entity ids or UUIDs to a list of entity ids. Returns a list with UUID resolved to entity_ids. Raises vol.Invalid if any item is invalid, or if any a UUID is not associated with an entity registry item. """ return [async_validate_entity_id(registry, item) for item in entity_ids_or_uuids]