"""Test for Template helper.""" from datetime import timedelta from unittest.mock import patch import pytest from homeassistant import config from homeassistant.components import labs from homeassistant.components.template import DOMAIN from homeassistant.config_entries import ConfigEntryState from homeassistant.const import SERVICE_RELOAD from homeassistant.core import Context, HomeAssistant from homeassistant.helpers import ( device_registry as dr, entity_registry as er, issue_registry as ir, ) from homeassistant.setup import async_setup_component from homeassistant.util import dt as dt_util from tests.common import ( MockConfigEntry, MockUser, async_capture_events, async_fire_time_changed, get_fixture_path, ) from tests.typing import WebSocketGenerator @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, }, "template": [ { "trigger": {"platform": "event", "event_type": "event_1"}, "sensor": { "name": "top level", "state": "{{ trigger.event.data.source }}", }, }, { "sensor": { "name": "top level state", "state": "{{ states.sensor.top_level.state }} + 2", }, "binary_sensor": { "name": "top level state", "state": "{{ states.sensor.top_level.state == 'init' }}", }, }, ], }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reloadable(hass: HomeAssistant) -> None: """Test that we can reload.""" hass.states.async_set("sensor.test_sensor", "mytest") await hass.async_block_till_done() assert hass.states.get("sensor.top_level_state").state == "unknown + 2" assert hass.states.get("binary_sensor.top_level_state").state == "off" hass.bus.async_fire("event_1", {"source": "init"}) await hass.async_block_till_done() assert len(hass.states.async_all()) == 5 assert hass.states.get("sensor.state").state == "mytest" assert hass.states.get("sensor.top_level").state == "init" await hass.async_block_till_done() assert hass.states.get("sensor.top_level_state").state == "init + 2" assert hass.states.get("binary_sensor.top_level_state").state == "on" await async_yaml_patch_helper(hass, "sensor_configuration.yaml") assert len(hass.states.async_all()) == 4 hass.bus.async_fire("event_2", {"source": "reload"}) await hass.async_block_till_done() assert hass.states.get("sensor.state") is None assert hass.states.get("sensor.top_level") is None assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off" assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0 assert hass.states.get("sensor.top_level_2").state == "reload" @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, }, "template": { "trigger": {"platform": "event", "event_type": "event_1"}, "sensor": { "name": "top level", "state": "{{ trigger.event.data.source }}", }, }, }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reloadable_can_remove(hass: HomeAssistant) -> None: """Test that we can reload and remove all template sensors.""" hass.states.async_set("sensor.test_sensor", "mytest") await hass.async_block_till_done() hass.bus.async_fire("event_1", {"source": "init"}) await hass.async_block_till_done() assert len(hass.states.async_all()) == 3 assert hass.states.get("sensor.state").state == "mytest" assert hass.states.get("sensor.top_level").state == "init" await async_yaml_patch_helper(hass, "empty_configuration.yaml") assert len(hass.states.async_all()) == 1 @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, } }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reloadable_stops_on_invalid_config(hass: HomeAssistant) -> None: """Test we stop the reload if configuration.yaml is completely broken.""" hass.states.async_set("sensor.test_sensor", "mytest") await hass.async_block_till_done() assert hass.states.get("sensor.state").state == "mytest" assert len(hass.states.async_all()) == 2 await async_yaml_patch_helper(hass, "configuration.yaml.corrupt") assert hass.states.get("sensor.state").state == "mytest" assert len(hass.states.async_all()) == 2 @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, } }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reloadable_handles_partial_valid_config(hass: HomeAssistant) -> None: """Test we can still setup valid sensors when configuration.yaml has a broken entry.""" hass.states.async_set("sensor.test_sensor", "mytest") await hass.async_block_till_done() assert hass.states.get("sensor.state").state == "mytest" assert len(hass.states.async_all("sensor")) == 2 await async_yaml_patch_helper(hass, "broken_configuration.yaml") assert len(hass.states.async_all("sensor")) == 3 assert hass.states.get("sensor.state") is None assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off" assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0 @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, } }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reloadable_multiple_platforms(hass: HomeAssistant) -> None: """Test that we can reload.""" hass.states.async_set("sensor.test_sensor", "mytest") await async_setup_component( hass, "binary_sensor", { "binary_sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, } }, ) await hass.async_block_till_done() assert hass.states.get("sensor.state").state == "mytest" assert hass.states.get("binary_sensor.state").state == "off" assert len(hass.states.async_all()) == 3 await async_yaml_patch_helper(hass, "sensor_configuration.yaml") assert len(hass.states.async_all()) == 4 assert hass.states.get("sensor.state") is None assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off" assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0 assert hass.states.get("sensor.top_level_2") is not None @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": {"value_template": "{{ 1 }}"}, }, } }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reload_sensors_that_reference_other_template_sensors( hass: HomeAssistant, ) -> None: """Test that we can reload sensor that reference other template sensors.""" await async_yaml_patch_helper(hass, "ref_configuration.yaml") assert len(hass.states.async_all()) == 3 await hass.async_block_till_done() next_time = dt_util.utcnow() + timedelta(seconds=1.2) with patch( "homeassistant.helpers.ratelimit.time.time", return_value=next_time.timestamp() ): async_fire_time_changed(hass, next_time) await hass.async_block_till_done() assert hass.states.get("sensor.test1").state == "3" assert hass.states.get("sensor.test2").state == "1" assert hass.states.get("sensor.test3").state == "2" @pytest.mark.parametrize(("count", "domain"), [(1, "sensor")]) @pytest.mark.parametrize( "config", [ { "sensor": { "platform": DOMAIN, "sensors": { "state": { "value_template": "{{ states.sensor.test_sensor.state }}" }, "state2": { "value_template": "{{ states.sensor.test_sensor.state }}" }, "state3": { "value_template": "{{ states.sensor.test_sensor.state }}" }, }, }, }, ], ) @pytest.mark.usefixtures("start_ha") async def test_reload_removes_legacy_deprecation( hass: HomeAssistant, issue_registry: ir.IssueRegistry ) -> None: """Test that we can reload and remove all template sensors.""" hass.states.async_set("sensor.test_sensor", "old") await hass.async_block_till_done() assert len(hass.states.async_all()) == 4 assert hass.states.get("sensor.state").state == "old" assert hass.states.get("sensor.state2").state == "old" assert hass.states.get("sensor.state3").state == "old" assert len(issue_registry.issues) == 3 await async_yaml_patch_helper(hass, "legacy_template_deprecation.yaml") assert len(hass.states.async_all()) == 4 assert hass.states.get("sensor.state").state == "old" assert hass.states.get("sensor.state2").state == "old" assert hass.states.get("sensor.state3").state == "old" assert len(issue_registry.issues) == 1 async def async_yaml_patch_helper(hass: HomeAssistant, filename: str) -> None: """Help update configuration.yaml.""" yaml_path = get_fixture_path(filename, "template") with patch.object(config, "YAML_CONFIG_FILE", yaml_path): await hass.services.async_call( DOMAIN, SERVICE_RELOAD, {}, blocking=True, ) await hass.async_block_till_done() @pytest.mark.parametrize( ( "config_entry_options", "config_user_input", ), [ ( { "name": "My template", "state": "{{10}}", "template_type": "sensor", }, { "state": "{{12}}", }, ), ( { "template_type": "binary_sensor", "name": "My template", "state": "{{1 == 1}}", }, { "state": "{{1 == 2}}", }, ), ( { "template_type": "image", "name": "My template", "url": "http://example.com", }, { "url": "http://example.com", }, ), ( { "template_type": "button", "name": "My template", }, {}, ), ( { "template_type": "number", "name": "My template", "state": "{{ 10 }}", "min": 0, "max": 100, "step": 0.1, "set_value": { "action": "input_number.set_value", "target": {"entity_id": "input_number.test"}, "data": {"value": "{{ value }}"}, }, }, { "state": "{{ 11 }}", "min": 0, "max": 100, "step": 0.1, "set_value": { "action": "input_number.set_value", "target": {"entity_id": "input_number.test"}, "data": {"value": "{{ value }}"}, }, }, ), ( { "template_type": "select", "name": "My template", "state": "{{ 'on' }}", "options": "{{ ['off', 'on', 'auto'] }}", }, { "state": "{{ 'on' }}", "options": "{{ ['off', 'on', 'auto'] }}", }, ), ( { "template_type": "switch", "name": "My template", "value_template": "{{ true }}", }, { "value_template": "{{ true }}", }, ), ( { "template_type": "event", "name": "My template", "event_type": "{{ 'single' }}", "event_types": "{{ ['single', 'double'] }}", }, { "event_type": "{{ 'single' }}", "event_types": "{{ ['single', 'double'] }}", }, ), ( { "template_type": "update", "name": "My template", "latest_version": "{{ '1.0' }}", "installed_version": "{{ '1.0' }}", }, { "latest_version": "{{ '1.0' }}", "installed_version": "{{ '1.0' }}", }, ), ], ) async def test_change_device( hass: HomeAssistant, device_registry: dr.DeviceRegistry, entity_registry: er.EntityRegistry, config_entry_options: dict[str, str], config_user_input: dict[str, str], ) -> None: """Test the link between the device and the config entry. Test, for each platform, that the device was linked to the config entry and the link was removed when the device is changed in the integration options. """ def check_template_entities( template_entity_id: str, device_id: str | None = None, ) -> None: """Check that the template entity is linked to the correct device.""" template_entity_ids: list[str] = [] for template_entity in entity_registry.entities.get_entries_for_config_entry_id( template_config_entry.entry_id ): template_entity_ids.append(template_entity.entity_id) assert template_entity.device_id == device_id assert template_entity_ids == [template_entity_id] # Configure devices registry entry_device1 = MockConfigEntry() entry_device1.add_to_hass(hass) device1 = device_registry.async_get_or_create( config_entry_id=entry_device1.entry_id, identifiers={("test", "identifier_test1")}, connections={("mac", "20:31:32:33:34:01")}, ) entry_device2 = MockConfigEntry() entry_device2.add_to_hass(hass) device2 = device_registry.async_get_or_create( config_entry_id=entry_device1.entry_id, identifiers={("test", "identifier_test2")}, connections={("mac", "20:31:32:33:34:02")}, ) await hass.async_block_till_done() device_id1 = device1.id assert device_id1 is not None device_id2 = device2.id assert device_id2 is not None # Setup the config entry template_config_entry = MockConfigEntry( data={}, domain=DOMAIN, options=config_entry_options | {"device_id": device_id1}, title="Template", ) template_config_entry.add_to_hass(hass) assert await hass.config_entries.async_setup(template_config_entry.entry_id) await hass.async_block_till_done() template_entity_id = f"{config_entry_options['template_type']}.my_template" # Confirm that the template config entry has not been added to either device # and that the entities are linked to device 1 for device_id in (device_id1, device_id2): device = device_registry.async_get(device_id=device_id) assert template_config_entry.entry_id not in device.config_entries check_template_entities(template_entity_id, device_id1) # Change config options to use device 2 and reload the integration result = await hass.config_entries.options.async_init( template_config_entry.entry_id ) result = await hass.config_entries.options.async_configure( result["flow_id"], user_input=config_user_input | {"device_id": device_id2}, ) await hass.async_block_till_done() # Confirm that the template config entry has not been added to either device # and that the entities are linked to device 2 for device_id in (device_id1, device_id2): device = device_registry.async_get(device_id=device_id) assert template_config_entry.entry_id not in device.config_entries check_template_entities(template_entity_id, device_id2) # Change the config options to remove the device and reload the integration result = await hass.config_entries.options.async_init( template_config_entry.entry_id ) result = await hass.config_entries.options.async_configure( result["flow_id"], user_input=config_user_input, ) await hass.async_block_till_done() # Confirm that the template config entry has not been added to either device # and that the entities are not linked to any device for device_id in (device_id1, device_id2): device = device_registry.async_get(device_id=device_id) assert template_config_entry.entry_id not in device.config_entries check_template_entities(template_entity_id, None) # Confirm that there is no device with the helper config entry assert ( dr.async_entries_for_config_entry( device_registry, template_config_entry.entry_id ) == [] ) async def test_fail_non_numerical_number_settings( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test that non numerical number options causes config entry setup to fail. Support for non numerical max, min and step was added in HA Core 2024.9.0 and removed in HA Core 2024.9.1. """ options = { "template_type": "number", "name": "My template", "state": "{{ 10 }}", "min": "{{ 0 }}", "max": "{{ 100 }}", "step": "{{ 0.1 }}", "set_value": { "action": "input_number.set_value", "target": {"entity_id": "input_number.test"}, "data": {"value": "{{ value }}"}, }, } # Setup the config entry template_config_entry = MockConfigEntry( data={}, domain=DOMAIN, options=options, title="Template", ) template_config_entry.add_to_hass(hass) assert not await hass.config_entries.async_setup(template_config_entry.entry_id) assert ( "The 'My template' number template needs to be reconfigured, " "max must be a number, got '{{ 100 }}'" in caplog.text ) async def test_yaml_reload_when_labs_flag_changes( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, hass_admin_user: MockUser, hass_read_only_user: MockUser, ) -> None: """Test templates are reloaded when labs flag changes.""" ws_client = await hass_ws_client(hass) assert await async_setup_component( hass, DOMAIN, { DOMAIN: { "triggers": { "trigger": "event", "event_type": "test_event", }, "sensor": { "name": "hello", "state": "{{ trigger.event.data.stuff }}", }, } }, ) assert await async_setup_component(hass, labs.DOMAIN, {}) assert hass.states.get("sensor.hello") is not None assert hass.states.get("sensor.bye") is None listeners = hass.bus.async_listeners() assert listeners.get("test_event") == 1 assert listeners.get("test_event2") is None context = Context() hass.bus.async_fire("test_event", {"stuff": "foo"}, context=context) await hass.async_block_till_done() assert hass.states.get("sensor.hello").state == "foo" test_reload_event = async_capture_events(hass, "event_template_reloaded") # Check we reload whenever the labs flag is set, even if it's already enabled last_state = "unknown" for enabled, set_state in ( (True, "foo"), (True, "bar"), (False, "beer"), (False, "good"), ): test_reload_event.clear() with patch( "homeassistant.config.load_yaml_config_file", autospec=True, return_value={ DOMAIN: { "triggers": { "trigger": "event", "event_type": "test_event2", }, "sensor": { "name": "bye", "state": "{{ trigger.event.data.stuff }}", }, } }, ): await ws_client.send_json_auto_id( { "type": "labs/update", "domain": "automation", "preview_feature": "new_triggers_conditions", "enabled": enabled, } ) msg = await ws_client.receive_json() assert msg["success"] await hass.async_block_till_done() assert len(test_reload_event) == 1 assert hass.states.get("sensor.hello") is None assert hass.states.get("sensor.bye") is not None listeners = hass.bus.async_listeners() assert listeners.get("test_event") is None assert listeners.get("test_event2") == 1 hass.bus.async_fire("test_event", {"stuff": "foo"}, context=context) await hass.async_block_till_done() assert hass.states.get("sensor.bye").state == last_state hass.bus.async_fire("test_event2", {"stuff": set_state}, context=context) await hass.async_block_till_done() assert hass.states.get("sensor.bye").state == set_state last_state = set_state async def test_config_entry_reload_when_labs_flag_changes( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, hass_admin_user: MockUser, hass_read_only_user: MockUser, ) -> None: """Test templates are reloaded when labs flag changes.""" ws_client = await hass_ws_client(hass) template_config_entry = MockConfigEntry( data={}, domain=DOMAIN, options={ "name": "hello", "template_type": "sensor", "state": "{{ 'foo' }}", }, title="My template", ) template_config_entry.add_to_hass(hass) assert await hass.config_entries.async_setup(template_config_entry.entry_id) await hass.async_block_till_done() assert await async_setup_component(hass, labs.DOMAIN, {}) assert hass.states.get("sensor.hello") is not None assert hass.states.get("sensor.hello").state == "foo" # Check we reload whenever the labs flag is set, even if it's already enabled for enabled, set_state in ( (True, "beer"), (True, "is"), (False, "very"), (False, "good"), ): hass.config_entries.async_update_entry( template_config_entry, options={ "name": "hello", "template_type": "sensor", "state": f"{{{{ '{set_state}' }}}}", }, ) with patch( "homeassistant.config.load_yaml_config_file", autospec=True, return_value={}, ): await ws_client.send_json_auto_id( { "type": "labs/update", "domain": "automation", "preview_feature": "new_triggers_conditions", "enabled": enabled, } ) msg = await ws_client.receive_json() assert msg["success"] await hass.async_block_till_done() assert hass.states.get("sensor.hello") is not None assert hass.states.get("sensor.hello").state == set_state async def test_migration_1_1( hass: HomeAssistant, device_registry: dr.DeviceRegistry, entity_registry: er.EntityRegistry, ) -> None: """Test migration from v1.1 removes template config entry from device.""" device_config_entry = MockConfigEntry() device_config_entry.add_to_hass(hass) device_entry = device_registry.async_get_or_create( config_entry_id=device_config_entry.entry_id, identifiers={("test", "identifier_test")}, connections={("mac", "30:31:32:33:34:35")}, ) template_config_entry = MockConfigEntry( data={}, domain=DOMAIN, options={ "name": "My template", "template_type": "sensor", "state": "{{ 'foo' }}", "device_id": device_entry.id, }, title="My template", version=1, minor_version=1, ) template_config_entry.add_to_hass(hass) # Add the helper config entry to the device device_registry.async_update_device( device_entry.id, add_config_entry_id=template_config_entry.entry_id ) # Check preconditions device_entry = device_registry.async_get(device_entry.id) assert template_config_entry.entry_id in device_entry.config_entries await hass.config_entries.async_setup(template_config_entry.entry_id) await hass.async_block_till_done() assert template_config_entry.state is ConfigEntryState.LOADED # Check that the helper config entry is removed from the device and the helper # entity is linked to the source device device_entry = device_registry.async_get(device_entry.id) assert template_config_entry.entry_id not in device_entry.config_entries template_entity_entry = entity_registry.async_get("sensor.my_template") assert template_entity_entry.device_id == device_entry.id assert template_config_entry.version == 1 assert template_config_entry.minor_version == 2 async def test_migration_from_future_version( hass: HomeAssistant, ) -> None: """Test migration from future version.""" config_entry = MockConfigEntry( data={}, domain=DOMAIN, options={ "name": "hello", "template_type": "sensor", "state": "{{ 'foo' }}", }, title="My template", version=2, minor_version=1, ) config_entry.add_to_hass(hass) await hass.config_entries.async_setup(config_entry.entry_id) await hass.async_block_till_done() assert config_entry.state is ConfigEntryState.MIGRATION_ERROR