1
0
mirror of https://github.com/home-assistant/core.git synced 2026-05-08 17:49:37 +01:00

Support target conditions in automation relation extraction (#161016)

This commit is contained in:
Abílio Costa
2026-01-20 17:34:21 +00:00
committed by GitHub
parent 7c49656fa8
commit e077c65a77
3 changed files with 291 additions and 5 deletions
@@ -602,6 +602,10 @@ class AutomationEntity(BaseAutomationEntity, RestoreEntity):
"""Return a set of referenced labels."""
referenced = self.action_script.referenced_labels
if self._cond_func is not None:
for conf in self._cond_func.config:
referenced |= condition.async_extract_labels(conf)
for conf in self._trigger_config:
referenced |= set(_get_targets_from_trigger_config(conf, ATTR_LABEL_ID))
return referenced
@@ -611,6 +615,10 @@ class AutomationEntity(BaseAutomationEntity, RestoreEntity):
"""Return a set of referenced floors."""
referenced = self.action_script.referenced_floors
if self._cond_func is not None:
for conf in self._cond_func.config:
referenced |= condition.async_extract_floors(conf)
for conf in self._trigger_config:
referenced |= set(_get_targets_from_trigger_config(conf, ATTR_FLOOR_ID))
return referenced
@@ -620,6 +628,10 @@ class AutomationEntity(BaseAutomationEntity, RestoreEntity):
"""Return a set of referenced areas."""
referenced = self.action_script.referenced_areas
if self._cond_func is not None:
for conf in self._cond_func.config:
referenced |= condition.async_extract_areas(conf)
for conf in self._trigger_config:
referenced |= set(_get_targets_from_trigger_config(conf, ATTR_AREA_ID))
return referenced
+73 -4
View File
@@ -17,6 +17,7 @@ from typing import (
TYPE_CHECKING,
Any,
Final,
Literal,
Protocol,
TypedDict,
Unpack,
@@ -28,7 +29,10 @@ from typing import (
import voluptuous as vol
from homeassistant.const import (
ATTR_AREA_ID,
ATTR_DEVICE_CLASS,
ATTR_FLOOR_ID,
ATTR_LABEL_ID,
CONF_ABOVE,
CONF_AFTER,
CONF_ATTRIBUTE,
@@ -1346,13 +1350,18 @@ def async_extract_entities(config: ConfigType | Template) -> set[str]:
if entity_ids is not None:
referenced.update(entity_ids)
if target_entities := _get_targets_from_condition_config(
config, CONF_ENTITY_ID
):
referenced.update(target_entities)
return referenced
@callback
def async_extract_devices(config: ConfigType | Template) -> set[str]:
"""Extract devices from a condition."""
referenced = set()
referenced: set[str] = set()
to_process = deque([config])
while to_process:
@@ -1366,15 +1375,75 @@ def async_extract_devices(config: ConfigType | Template) -> set[str]:
to_process.extend(config["conditions"])
continue
if condition != "device":
if condition == "device":
if (device_id := config.get(CONF_DEVICE_ID)) is not None:
referenced.add(device_id)
continue
if (device_id := config.get(CONF_DEVICE_ID)) is not None:
referenced.add(device_id)
if target_devices := _get_targets_from_condition_config(config, CONF_DEVICE_ID):
referenced.update(target_devices)
return referenced
@callback
def async_extract_areas(config: ConfigType | Template) -> set[str]:
"""Extract areas from a condition."""
return _async_extract_targets(config, ATTR_AREA_ID)
@callback
def async_extract_floors(config: ConfigType | Template) -> set[str]:
"""Extract floors from a condition."""
return _async_extract_targets(config, ATTR_FLOOR_ID)
@callback
def async_extract_labels(config: ConfigType | Template) -> set[str]:
"""Extract labels from a condition."""
return _async_extract_targets(config, ATTR_LABEL_ID)
@callback
def _async_extract_targets(
config: ConfigType | Template,
target_type: Literal["entity_id", "device_id", "area_id", "floor_id", "label_id"],
) -> set[str]:
"""Extract targets from a condition."""
referenced: set[str] = set()
to_process = deque([config])
while to_process:
config = to_process.popleft()
if isinstance(config, Template):
continue
condition = config[CONF_CONDITION]
if condition in ("and", "not", "or"):
to_process.extend(config["conditions"])
continue
if targets := _get_targets_from_condition_config(config, target_type):
referenced.update(targets)
return referenced
@callback
def _get_targets_from_condition_config(
config: ConfigType,
target: Literal["entity_id", "device_id", "area_id", "floor_id", "label_id"],
) -> list[str]:
"""Extract targets from a condition target config."""
if not (target_conf := config.get(CONF_TARGET)):
return []
if not (targets := target_conf.get(target)):
return []
return [targets] if isinstance(targets, str) else targets
def _load_conditions_file(integration: Integration) -> dict[str, Any]:
"""Load conditions file for an integration."""
try:
+206 -1
View File
@@ -2232,7 +2232,7 @@ async def test_extraction_functions(
assert automation.blueprint_in_automation(hass, "automation.test3") is None
async def test_extraction_functions_with_targets(
async def test_extraction_functions_with_trigger_targets(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
hass_ws_client: WebSocketGenerator,
@@ -2428,6 +2428,211 @@ async def test_extraction_functions_with_targets(
}
async def test_extraction_functions_with_condition_targets(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test extraction functions with targets in conditions."""
config_entry = MockConfigEntry(domain="fake_integration", data={})
config_entry.mock_state(hass, ConfigEntryState.LOADED)
config_entry.add_to_hass(hass)
condition_device = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "00:00:00:00:00:02")},
)
await async_setup_component(hass, "homeassistant", {})
await async_setup_component(hass, "light", {"light": {"platform": "demo"}})
await hass.async_block_till_done()
# Enable the new_triggers_conditions feature flag to allow new-style conditions
assert await async_setup_component(hass, "labs", {})
ws_client = await hass_ws_client(hass)
await ws_client.send_json_auto_id(
{
"type": "labs/update",
"domain": "automation",
"preview_feature": "new_triggers_conditions",
"enabled": True,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
await hass.async_block_till_done()
assert await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: [
{
"alias": "test1",
"triggers": [
{"trigger": "state", "entity_id": "sensor.trigger_state"},
],
"conditions": [
# Single entity_id in target
{
"condition": "light.is_on",
"target": {"entity_id": "light.condition_entity"},
"options": {"behavior": "any"},
},
# Multiple entity_ids in target
{
"condition": "light.is_on",
"target": {
"entity_id": [
"light.condition_entity_list1",
"light.condition_entity_list2",
]
},
"options": {"behavior": "any"},
},
# Single device_id in target
{
"condition": "light.is_on",
"target": {"device_id": condition_device.id},
"options": {"behavior": "any"},
},
# Multiple device_ids in target
{
"condition": "light.is_on",
"target": {
"device_id": [
"target-device-1",
"target-device-2",
]
},
"options": {"behavior": "any"},
},
# Single area_id in target
{
"condition": "light.is_on",
"target": {"area_id": "area-condition-single"},
"options": {"behavior": "any"},
},
# Multiple area_ids in target
{
"condition": "light.is_on",
"target": {
"area_id": ["area-condition-1", "area-condition-2"]
},
"options": {"behavior": "any"},
},
# Single floor_id in target
{
"condition": "light.is_on",
"target": {"floor_id": "floor-condition-single"},
"options": {"behavior": "any"},
},
# Multiple floor_ids in target
{
"condition": "light.is_on",
"target": {
"floor_id": ["floor-condition-1", "floor-condition-2"]
},
"options": {"behavior": "any"},
},
# Single label_id in target
{
"condition": "light.is_on",
"target": {"label_id": "label-condition-single"},
"options": {"behavior": "any"},
},
# Multiple label_ids in target
{
"condition": "light.is_on",
"target": {
"label_id": ["label-condition-1", "label-condition-2"]
},
"options": {"behavior": "any"},
},
# Combined targets
{
"condition": "light.is_on",
"target": {
"entity_id": "light.combined_entity",
"device_id": "combined-device",
"area_id": "combined-area",
"floor_id": "combined-floor",
"label_id": "combined-label",
},
"options": {"behavior": "any"},
},
],
"actions": [
{
"action": "test.script",
"data": {"entity_id": "light.action_entity"},
},
],
},
]
},
)
# Test entity extraction from condition targets
assert set(automation.entities_in_automation(hass, "automation.test1")) == {
"sensor.trigger_state",
"light.condition_entity",
"light.condition_entity_list1",
"light.condition_entity_list2",
"light.combined_entity",
"light.action_entity",
}
# Test device extraction from condition targets
assert set(automation.devices_in_automation(hass, "automation.test1")) == {
condition_device.id,
"target-device-1",
"target-device-2",
"combined-device",
}
# Test area extraction from condition targets
assert set(automation.areas_in_automation(hass, "automation.test1")) == {
"area-condition-single",
"area-condition-1",
"area-condition-2",
"combined-area",
}
# Test floor extraction from condition targets
assert set(automation.floors_in_automation(hass, "automation.test1")) == {
"floor-condition-single",
"floor-condition-1",
"floor-condition-2",
"combined-floor",
}
# Test label extraction from condition targets
assert set(automation.labels_in_automation(hass, "automation.test1")) == {
"label-condition-single",
"label-condition-1",
"label-condition-2",
"combined-label",
}
# Test automations_with_* functions
assert set(automation.automations_with_entity(hass, "light.condition_entity")) == {
"automation.test1"
}
assert set(automation.automations_with_device(hass, condition_device.id)) == {
"automation.test1"
}
assert set(automation.automations_with_area(hass, "area-condition-single")) == {
"automation.test1"
}
assert set(automation.automations_with_floor(hass, "floor-condition-single")) == {
"automation.test1"
}
assert set(automation.automations_with_label(hass, "label-condition-single")) == {
"automation.test1"
}
async def test_logbook_humanify_automation_triggered_event(hass: HomeAssistant) -> None:
"""Test humanifying Automation Trigger event."""
hass.config.components.add("recorder")