mirror of
https://github.com/home-assistant/core.git
synced 2025-12-22 20:09:35 +00:00
Increase default rate limit for all states and entire domain states to one minute Ensure specifically referenced entities are excluded from the rate limit
2805 lines
85 KiB
Python
2805 lines
85 KiB
Python
"""Test event helpers."""
|
|
# pylint: disable=protected-access
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
from astral import Astral
|
|
import jinja2
|
|
import pytest
|
|
|
|
from homeassistant.components import sun
|
|
from homeassistant.const import MATCH_ALL
|
|
import homeassistant.core as ha
|
|
from homeassistant.core import callback
|
|
from homeassistant.exceptions import TemplateError
|
|
from homeassistant.helpers.entity_registry import EVENT_ENTITY_REGISTRY_UPDATED
|
|
from homeassistant.helpers.event import (
|
|
TrackStates,
|
|
TrackTemplate,
|
|
TrackTemplateResult,
|
|
async_call_later,
|
|
async_track_point_in_time,
|
|
async_track_point_in_utc_time,
|
|
async_track_same_state,
|
|
async_track_state_added_domain,
|
|
async_track_state_change,
|
|
async_track_state_change_event,
|
|
async_track_state_change_filtered,
|
|
async_track_state_removed_domain,
|
|
async_track_sunrise,
|
|
async_track_sunset,
|
|
async_track_template,
|
|
async_track_template_result,
|
|
async_track_time_change,
|
|
async_track_time_interval,
|
|
async_track_utc_time_change,
|
|
track_point_in_utc_time,
|
|
)
|
|
from homeassistant.helpers.template import Template
|
|
from homeassistant.setup import async_setup_component
|
|
import homeassistant.util.dt as dt_util
|
|
|
|
from tests.async_mock import patch
|
|
from tests.common import async_fire_time_changed
|
|
|
|
DEFAULT_TIME_ZONE = dt_util.DEFAULT_TIME_ZONE
|
|
|
|
|
|
def teardown():
|
|
"""Stop everything that was started."""
|
|
dt_util.set_default_time_zone(DEFAULT_TIME_ZONE)
|
|
|
|
|
|
async def test_track_point_in_time(hass):
|
|
"""Test track point in time."""
|
|
before_birthday = datetime(1985, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
birthday_paulus = datetime(1986, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
after_birthday = datetime(1987, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
|
|
runs = []
|
|
|
|
async_track_point_in_utc_time(
|
|
hass, callback(lambda x: runs.append(x)), birthday_paulus
|
|
)
|
|
|
|
async_fire_time_changed(hass, before_birthday)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 0
|
|
|
|
async_fire_time_changed(hass, birthday_paulus)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
|
|
# A point in time tracker will only fire once, this should do nothing
|
|
async_fire_time_changed(hass, birthday_paulus)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
|
|
async_track_point_in_utc_time(
|
|
hass, callback(lambda x: runs.append(x)), birthday_paulus
|
|
)
|
|
|
|
async_fire_time_changed(hass, after_birthday)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 2
|
|
|
|
unsub = async_track_point_in_time(
|
|
hass, callback(lambda x: runs.append(x)), birthday_paulus
|
|
)
|
|
unsub()
|
|
|
|
async_fire_time_changed(hass, after_birthday)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 2
|
|
|
|
|
|
async def test_track_state_change_from_to_state_match(hass):
|
|
"""Test track_state_change with from and to state matchers."""
|
|
from_and_to_state_runs = []
|
|
only_from_runs = []
|
|
only_to_runs = []
|
|
match_all_runs = []
|
|
no_to_from_specified_runs = []
|
|
|
|
def from_and_to_state_callback(entity_id, old_state, new_state):
|
|
from_and_to_state_runs.append(1)
|
|
|
|
def only_from_state_callback(entity_id, old_state, new_state):
|
|
only_from_runs.append(1)
|
|
|
|
def only_to_state_callback(entity_id, old_state, new_state):
|
|
only_to_runs.append(1)
|
|
|
|
def match_all_callback(entity_id, old_state, new_state):
|
|
match_all_runs.append(1)
|
|
|
|
def no_to_from_specified_callback(entity_id, old_state, new_state):
|
|
no_to_from_specified_runs.append(1)
|
|
|
|
async_track_state_change(
|
|
hass, "light.Bowl", from_and_to_state_callback, "on", "off"
|
|
)
|
|
async_track_state_change(hass, "light.Bowl", only_from_state_callback, "on", None)
|
|
async_track_state_change(
|
|
hass, "light.Bowl", only_to_state_callback, None, ["off", "standby"]
|
|
)
|
|
async_track_state_change(
|
|
hass, "light.Bowl", match_all_callback, MATCH_ALL, MATCH_ALL
|
|
)
|
|
async_track_state_change(hass, "light.Bowl", no_to_from_specified_callback)
|
|
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 0
|
|
assert len(only_from_runs) == 0
|
|
assert len(only_to_runs) == 0
|
|
assert len(match_all_runs) == 1
|
|
assert len(no_to_from_specified_runs) == 1
|
|
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 1
|
|
assert len(only_from_runs) == 1
|
|
assert len(only_to_runs) == 1
|
|
assert len(match_all_runs) == 2
|
|
assert len(no_to_from_specified_runs) == 2
|
|
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 1
|
|
assert len(only_from_runs) == 1
|
|
assert len(only_to_runs) == 1
|
|
assert len(match_all_runs) == 3
|
|
assert len(no_to_from_specified_runs) == 3
|
|
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 1
|
|
assert len(only_from_runs) == 1
|
|
assert len(only_to_runs) == 1
|
|
assert len(match_all_runs) == 3
|
|
assert len(no_to_from_specified_runs) == 3
|
|
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 2
|
|
assert len(only_from_runs) == 2
|
|
assert len(only_to_runs) == 2
|
|
assert len(match_all_runs) == 4
|
|
assert len(no_to_from_specified_runs) == 4
|
|
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(from_and_to_state_runs) == 2
|
|
assert len(only_from_runs) == 2
|
|
assert len(only_to_runs) == 2
|
|
assert len(match_all_runs) == 4
|
|
assert len(no_to_from_specified_runs) == 4
|
|
|
|
|
|
async def test_track_state_change(hass):
|
|
"""Test track_state_change."""
|
|
# 2 lists to track how often our callbacks get called
|
|
specific_runs = []
|
|
wildcard_runs = []
|
|
wildercard_runs = []
|
|
|
|
def specific_run_callback(entity_id, old_state, new_state):
|
|
specific_runs.append(1)
|
|
|
|
# This is the rare use case
|
|
async_track_state_change(hass, "light.Bowl", specific_run_callback, "on", "off")
|
|
|
|
@ha.callback
|
|
def wildcard_run_callback(entity_id, old_state, new_state):
|
|
wildcard_runs.append((old_state, new_state))
|
|
|
|
# This is the most common use case
|
|
async_track_state_change(hass, "light.Bowl", wildcard_run_callback)
|
|
|
|
async def wildercard_run_callback(entity_id, old_state, new_state):
|
|
wildercard_runs.append((old_state, new_state))
|
|
|
|
async_track_state_change(hass, MATCH_ALL, wildercard_run_callback)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
assert wildcard_runs[-1][0] is None
|
|
assert wildcard_runs[-1][1] is not None
|
|
|
|
# Set same state should not trigger a state change/listener
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 2
|
|
assert len(wildercard_runs) == 2
|
|
|
|
# State change off -> off
|
|
hass.states.async_set("light.Bowl", "off", {"some_attr": 1})
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 3
|
|
assert len(wildercard_runs) == 3
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 4
|
|
assert len(wildercard_runs) == 4
|
|
|
|
hass.states.async_remove("light.bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 5
|
|
assert len(wildercard_runs) == 5
|
|
assert wildcard_runs[-1][0] is not None
|
|
assert wildcard_runs[-1][1] is None
|
|
assert wildercard_runs[-1][0] is not None
|
|
assert wildercard_runs[-1][1] is None
|
|
|
|
# Set state for different entity id
|
|
hass.states.async_set("switch.kitchen", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 5
|
|
assert len(wildercard_runs) == 6
|
|
|
|
|
|
async def test_async_track_state_change_filtered(hass):
|
|
"""Test async_track_state_change_filtered."""
|
|
single_entity_id_tracker = []
|
|
multiple_entity_id_tracker = []
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
single_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def multiple_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
multiple_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def callback_that_throws(event):
|
|
raise ValueError
|
|
|
|
track_single = async_track_state_change_filtered(
|
|
hass, TrackStates(False, {"light.bowl"}, None), single_run_callback
|
|
)
|
|
assert track_single.listeners == {
|
|
"all": False,
|
|
"domains": None,
|
|
"entities": {"light.bowl"},
|
|
}
|
|
|
|
track_multi = async_track_state_change_filtered(
|
|
hass, TrackStates(False, {"light.bowl"}, {"switch"}), multiple_run_callback
|
|
)
|
|
assert track_multi.listeners == {
|
|
"all": False,
|
|
"domains": {"switch"},
|
|
"entities": {"light.bowl"},
|
|
}
|
|
|
|
track_throws = async_track_state_change_filtered(
|
|
hass, TrackStates(False, {"light.bowl"}, {"switch"}), callback_that_throws
|
|
)
|
|
assert track_throws.listeners == {
|
|
"all": False,
|
|
"domains": {"switch"},
|
|
"entities": {"light.bowl"},
|
|
}
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert single_entity_id_tracker[-1][0] is None
|
|
assert single_entity_id_tracker[-1][1] is not None
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
assert multiple_entity_id_tracker[-1][0] is None
|
|
assert multiple_entity_id_tracker[-1][1] is not None
|
|
|
|
# Set same state should not trigger a state change/listener
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 2
|
|
assert len(multiple_entity_id_tracker) == 2
|
|
|
|
# State change off -> off
|
|
hass.states.async_set("light.Bowl", "off", {"some_attr": 1})
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 3
|
|
assert len(multiple_entity_id_tracker) == 3
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 4
|
|
assert len(multiple_entity_id_tracker) == 4
|
|
|
|
hass.states.async_remove("light.bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert single_entity_id_tracker[-1][0] is not None
|
|
assert single_entity_id_tracker[-1][1] is None
|
|
assert len(multiple_entity_id_tracker) == 5
|
|
assert multiple_entity_id_tracker[-1][0] is not None
|
|
assert multiple_entity_id_tracker[-1][1] is None
|
|
|
|
# Set state for different entity id
|
|
hass.states.async_set("switch.kitchen", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert len(multiple_entity_id_tracker) == 6
|
|
|
|
track_single.async_remove()
|
|
# Ensure unsubing the listener works
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert len(multiple_entity_id_tracker) == 7
|
|
|
|
assert track_multi.listeners == {
|
|
"all": False,
|
|
"domains": {"switch"},
|
|
"entities": {"light.bowl"},
|
|
}
|
|
track_multi.async_update_listeners(TrackStates(False, {"light.bowl"}, None))
|
|
assert track_multi.listeners == {
|
|
"all": False,
|
|
"domains": None,
|
|
"entities": {"light.bowl"},
|
|
}
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(multiple_entity_id_tracker) == 8
|
|
hass.states.async_set("switch.kitchen", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(multiple_entity_id_tracker) == 8
|
|
|
|
track_multi.async_update_listeners(TrackStates(True, None, None))
|
|
hass.states.async_set("switch.kitchen", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(multiple_entity_id_tracker) == 8
|
|
hass.states.async_set("switch.any", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(multiple_entity_id_tracker) == 9
|
|
|
|
track_multi.async_remove()
|
|
track_throws.async_remove()
|
|
|
|
|
|
async def test_async_track_state_change_event(hass):
|
|
"""Test async_track_state_change_event."""
|
|
single_entity_id_tracker = []
|
|
multiple_entity_id_tracker = []
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
single_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def multiple_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
multiple_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def callback_that_throws(event):
|
|
raise ValueError
|
|
|
|
unsub_single = async_track_state_change_event(
|
|
hass, ["light.Bowl"], single_run_callback
|
|
)
|
|
unsub_multi = async_track_state_change_event(
|
|
hass, ["light.Bowl", "switch.kitchen"], multiple_run_callback
|
|
)
|
|
unsub_throws = async_track_state_change_event(
|
|
hass, ["light.Bowl", "switch.kitchen"], callback_that_throws
|
|
)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert single_entity_id_tracker[-1][0] is None
|
|
assert single_entity_id_tracker[-1][1] is not None
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
assert multiple_entity_id_tracker[-1][0] is None
|
|
assert multiple_entity_id_tracker[-1][1] is not None
|
|
|
|
# Set same state should not trigger a state change/listener
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 2
|
|
assert len(multiple_entity_id_tracker) == 2
|
|
|
|
# State change off -> off
|
|
hass.states.async_set("light.Bowl", "off", {"some_attr": 1})
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 3
|
|
assert len(multiple_entity_id_tracker) == 3
|
|
|
|
# State change off -> on
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 4
|
|
assert len(multiple_entity_id_tracker) == 4
|
|
|
|
hass.states.async_remove("light.bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert single_entity_id_tracker[-1][0] is not None
|
|
assert single_entity_id_tracker[-1][1] is None
|
|
assert len(multiple_entity_id_tracker) == 5
|
|
assert multiple_entity_id_tracker[-1][0] is not None
|
|
assert multiple_entity_id_tracker[-1][1] is None
|
|
|
|
# Set state for different entity id
|
|
hass.states.async_set("switch.kitchen", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert len(multiple_entity_id_tracker) == 6
|
|
|
|
unsub_single()
|
|
# Ensure unsubing the listener works
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 5
|
|
assert len(multiple_entity_id_tracker) == 7
|
|
|
|
unsub_multi()
|
|
unsub_throws()
|
|
|
|
|
|
async def test_async_track_state_added_domain(hass):
|
|
"""Test async_track_state_added_domain."""
|
|
single_entity_id_tracker = []
|
|
multiple_entity_id_tracker = []
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
single_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def multiple_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
multiple_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def callback_that_throws(event):
|
|
raise ValueError
|
|
|
|
unsub_single = async_track_state_added_domain(hass, "light", single_run_callback)
|
|
unsub_multi = async_track_state_added_domain(
|
|
hass, ["light", "switch"], multiple_run_callback
|
|
)
|
|
unsub_throws = async_track_state_added_domain(
|
|
hass, ["light", "switch"], callback_that_throws
|
|
)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert single_entity_id_tracker[-1][0] is None
|
|
assert single_entity_id_tracker[-1][1] is not None
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
assert multiple_entity_id_tracker[-1][0] is None
|
|
assert multiple_entity_id_tracker[-1][1] is not None
|
|
|
|
# Set same state should not trigger a state change/listener
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# State change off -> on - nothing added so no trigger
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# State change off -> off - nothing added so no trigger
|
|
hass.states.async_set("light.Bowl", "off", {"some_attr": 1})
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# Removing state does not trigger
|
|
hass.states.async_remove("light.bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
|
|
# Set state for different entity id
|
|
hass.states.async_set("switch.kitchen", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 2
|
|
|
|
unsub_single()
|
|
# Ensure unsubing the listener works
|
|
hass.states.async_set("light.new", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(multiple_entity_id_tracker) == 3
|
|
|
|
unsub_multi()
|
|
unsub_throws()
|
|
|
|
|
|
async def test_async_track_state_removed_domain(hass):
|
|
"""Test async_track_state_removed_domain."""
|
|
single_entity_id_tracker = []
|
|
multiple_entity_id_tracker = []
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
single_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def multiple_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
multiple_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def callback_that_throws(event):
|
|
raise ValueError
|
|
|
|
unsub_single = async_track_state_removed_domain(hass, "light", single_run_callback)
|
|
unsub_multi = async_track_state_removed_domain(
|
|
hass, ["light", "switch"], multiple_run_callback
|
|
)
|
|
unsub_throws = async_track_state_removed_domain(
|
|
hass, ["light", "switch"], callback_that_throws
|
|
)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
hass.states.async_remove("light.Bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert single_entity_id_tracker[-1][1] is None
|
|
assert single_entity_id_tracker[-1][0] is not None
|
|
assert len(multiple_entity_id_tracker) == 1
|
|
assert multiple_entity_id_tracker[-1][1] is None
|
|
assert multiple_entity_id_tracker[-1][0] is not None
|
|
|
|
# Added and than removed (light)
|
|
hass.states.async_set("light.Bowl", "on")
|
|
hass.states.async_remove("light.Bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 2
|
|
assert len(multiple_entity_id_tracker) == 2
|
|
|
|
# Added and than removed (light)
|
|
hass.states.async_set("light.Bowl", "off")
|
|
hass.states.async_remove("light.Bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 3
|
|
assert len(multiple_entity_id_tracker) == 3
|
|
|
|
# Added and than removed (light)
|
|
hass.states.async_set("light.Bowl", "off", {"some_attr": 1})
|
|
hass.states.async_remove("light.Bowl")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 4
|
|
assert len(multiple_entity_id_tracker) == 4
|
|
|
|
# Added and than removed (switch)
|
|
hass.states.async_set("switch.kitchen", "on")
|
|
hass.states.async_remove("switch.kitchen")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 4
|
|
assert len(multiple_entity_id_tracker) == 5
|
|
|
|
unsub_single()
|
|
# Ensure unsubing the listener works
|
|
hass.states.async_set("light.new", "off")
|
|
hass.states.async_remove("light.new")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 4
|
|
assert len(multiple_entity_id_tracker) == 6
|
|
|
|
unsub_multi()
|
|
unsub_throws()
|
|
|
|
|
|
async def test_async_track_state_removed_domain_match_all(hass):
|
|
"""Test async_track_state_removed_domain with a match_all."""
|
|
single_entity_id_tracker = []
|
|
match_all_entity_id_tracker = []
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
single_entity_id_tracker.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def match_all_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
match_all_entity_id_tracker.append((old_state, new_state))
|
|
|
|
unsub_single = async_track_state_removed_domain(hass, "light", single_run_callback)
|
|
unsub_match_all = async_track_state_removed_domain(
|
|
hass, MATCH_ALL, match_all_run_callback
|
|
)
|
|
hass.states.async_set("light.new", "off")
|
|
hass.states.async_remove("light.new")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(match_all_entity_id_tracker) == 1
|
|
|
|
hass.states.async_set("switch.new", "off")
|
|
hass.states.async_remove("switch.new")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(match_all_entity_id_tracker) == 2
|
|
|
|
unsub_match_all()
|
|
unsub_single()
|
|
hass.states.async_set("switch.new", "off")
|
|
hass.states.async_remove("switch.new")
|
|
await hass.async_block_till_done()
|
|
assert len(single_entity_id_tracker) == 1
|
|
assert len(match_all_entity_id_tracker) == 2
|
|
|
|
|
|
async def test_track_template(hass):
|
|
"""Test tracking template."""
|
|
specific_runs = []
|
|
wildcard_runs = []
|
|
wildercard_runs = []
|
|
|
|
template_condition = Template("{{states.switch.test.state == 'on'}}", hass)
|
|
template_condition_var = Template(
|
|
"{{states.switch.test.state == 'on' and test == 5}}", hass
|
|
)
|
|
|
|
hass.states.async_set("switch.test", "off")
|
|
|
|
def specific_run_callback(entity_id, old_state, new_state):
|
|
specific_runs.append(1)
|
|
|
|
async_track_template(hass, template_condition, specific_run_callback)
|
|
|
|
@ha.callback
|
|
def wildcard_run_callback(entity_id, old_state, new_state):
|
|
wildcard_runs.append((old_state, new_state))
|
|
|
|
async_track_template(hass, template_condition, wildcard_run_callback)
|
|
|
|
async def wildercard_run_callback(entity_id, old_state, new_state):
|
|
wildercard_runs.append((old_state, new_state))
|
|
|
|
async_track_template(
|
|
hass, template_condition_var, wildercard_run_callback, {"test": 5}
|
|
)
|
|
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 1
|
|
assert len(wildercard_runs) == 1
|
|
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 2
|
|
assert len(wildcard_runs) == 2
|
|
assert len(wildercard_runs) == 2
|
|
|
|
template_iterate = Template("{{ (states.switch | length) > 0 }}", hass)
|
|
iterate_calls = []
|
|
|
|
@ha.callback
|
|
def iterate_callback(entity_id, old_state, new_state):
|
|
iterate_calls.append((entity_id, old_state, new_state))
|
|
|
|
async_track_template(hass, template_iterate, iterate_callback)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("switch.new", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(iterate_calls) == 1
|
|
assert iterate_calls[0][0] == "switch.new"
|
|
assert iterate_calls[0][1] is None
|
|
assert iterate_calls[0][2].state == "on"
|
|
|
|
|
|
async def test_track_template_error(hass, caplog):
|
|
"""Test tracking template with error."""
|
|
template_error = Template("{{ (states.switch | lunch) > 0 }}", hass)
|
|
error_calls = []
|
|
|
|
@ha.callback
|
|
def error_callback(entity_id, old_state, new_state):
|
|
error_calls.append((entity_id, old_state, new_state))
|
|
|
|
async_track_template(hass, template_error, error_callback)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("switch.new", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert not error_calls
|
|
assert "lunch" in caplog.text
|
|
assert "TemplateAssertionError" in caplog.text
|
|
|
|
caplog.clear()
|
|
|
|
with patch.object(Template, "async_render") as render:
|
|
render.return_value = "ok"
|
|
|
|
hass.states.async_set("switch.not_exist", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert "no filter named 'lunch'" not in caplog.text
|
|
assert "TemplateAssertionError" not in caplog.text
|
|
|
|
|
|
async def test_track_template_error_can_recover(hass, caplog):
|
|
"""Test tracking template with error."""
|
|
hass.states.async_set("switch.data_system", "cow", {"opmode": 0})
|
|
template_error = Template(
|
|
"{{ states.sensor.data_system.attributes['opmode'] == '0' }}", hass
|
|
)
|
|
error_calls = []
|
|
|
|
@ha.callback
|
|
def error_callback(entity_id, old_state, new_state):
|
|
error_calls.append((entity_id, old_state, new_state))
|
|
|
|
async_track_template(hass, template_error, error_callback)
|
|
await hass.async_block_till_done()
|
|
assert not error_calls
|
|
|
|
hass.states.async_remove("switch.data_system")
|
|
|
|
assert "UndefinedError" in caplog.text
|
|
|
|
hass.states.async_set("switch.data_system", "cow", {"opmode": 0})
|
|
|
|
caplog.clear()
|
|
|
|
assert "UndefinedError" not in caplog.text
|
|
|
|
|
|
async def test_track_template_result(hass):
|
|
"""Test tracking template."""
|
|
specific_runs = []
|
|
wildcard_runs = []
|
|
wildercard_runs = []
|
|
|
|
template_condition = Template("{{states.sensor.test.state}}", hass)
|
|
template_condition_var = Template(
|
|
"{{(states.sensor.test.state|int) + test }}", hass
|
|
)
|
|
|
|
def specific_run_callback(event, updates):
|
|
track_result = updates.pop()
|
|
specific_runs.append(int(track_result.result))
|
|
|
|
async_track_template_result(
|
|
hass, [TrackTemplate(template_condition, None)], specific_run_callback
|
|
)
|
|
|
|
@ha.callback
|
|
def wildcard_run_callback(event, updates):
|
|
track_result = updates.pop()
|
|
wildcard_runs.append(
|
|
(int(track_result.last_result or 0), int(track_result.result))
|
|
)
|
|
|
|
async_track_template_result(
|
|
hass, [TrackTemplate(template_condition, None)], wildcard_run_callback
|
|
)
|
|
|
|
async def wildercard_run_callback(event, updates):
|
|
track_result = updates.pop()
|
|
wildercard_runs.append(
|
|
(int(track_result.last_result or 0), int(track_result.result))
|
|
)
|
|
|
|
async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_condition_var, {"test": 5})],
|
|
wildercard_run_callback,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("sensor.test", 5)
|
|
await hass.async_block_till_done()
|
|
|
|
assert specific_runs == [5]
|
|
assert wildcard_runs == [(0, 5)]
|
|
assert wildercard_runs == [(0, 10)]
|
|
|
|
hass.states.async_set("sensor.test", 30)
|
|
await hass.async_block_till_done()
|
|
|
|
assert specific_runs == [5, 30]
|
|
assert wildcard_runs == [(0, 5), (5, 30)]
|
|
assert wildercard_runs == [(0, 10), (10, 35)]
|
|
|
|
hass.states.async_set("sensor.test", 30)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 2
|
|
assert len(wildcard_runs) == 2
|
|
assert len(wildercard_runs) == 2
|
|
|
|
hass.states.async_set("sensor.test", 5)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 3
|
|
assert len(wildcard_runs) == 3
|
|
assert len(wildercard_runs) == 3
|
|
|
|
hass.states.async_set("sensor.test", 5)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 3
|
|
assert len(wildcard_runs) == 3
|
|
assert len(wildercard_runs) == 3
|
|
|
|
hass.states.async_set("sensor.test", 20)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(specific_runs) == 4
|
|
assert len(wildcard_runs) == 4
|
|
assert len(wildercard_runs) == 4
|
|
|
|
|
|
async def test_track_template_result_complex(hass):
|
|
"""Test tracking template."""
|
|
specific_runs = []
|
|
template_complex_str = """
|
|
{% if states("sensor.domain") == "light" %}
|
|
{{ states.light | map(attribute='entity_id') | list }}
|
|
{% elif states("sensor.domain") == "lock" %}
|
|
{{ states.lock | map(attribute='entity_id') | list }}
|
|
{% elif states("sensor.domain") == "single_binary_sensor" %}
|
|
{{ states("binary_sensor.single") }}
|
|
{% else %}
|
|
{{ states | map(attribute='entity_id') | list }}
|
|
{% endif %}
|
|
|
|
"""
|
|
template_complex = Template(template_complex_str, hass)
|
|
|
|
def specific_run_callback(event, updates):
|
|
specific_runs.append(updates.pop().result)
|
|
|
|
hass.states.async_set("light.one", "on")
|
|
hass.states.async_set("lock.one", "locked")
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_complex, None, timedelta(seconds=0))],
|
|
specific_run_callback,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert info.listeners == {"all": True, "domains": set(), "entities": set()}
|
|
|
|
hass.states.async_set("sensor.domain", "light")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert specific_runs[0].strip() == "['light.one']"
|
|
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"light"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.domain", "lock")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
assert specific_runs[1].strip() == "['lock.one']"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"lock"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.domain", "all")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 3
|
|
assert "light.one" in specific_runs[2]
|
|
assert "lock.one" in specific_runs[2]
|
|
assert "sensor.domain" in specific_runs[2]
|
|
assert info.listeners == {"all": True, "domains": set(), "entities": set()}
|
|
|
|
hass.states.async_set("sensor.domain", "light")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 4
|
|
assert specific_runs[3].strip() == "['light.one']"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"light"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("light.two", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 5
|
|
assert "light.one" in specific_runs[4]
|
|
assert "light.two" in specific_runs[4]
|
|
assert "sensor.domain" not in specific_runs[4]
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"light"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("light.three", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 6
|
|
assert "light.one" in specific_runs[5]
|
|
assert "light.two" in specific_runs[5]
|
|
assert "light.three" in specific_runs[5]
|
|
assert "sensor.domain" not in specific_runs[5]
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"light"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.domain", "lock")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 7
|
|
assert specific_runs[6].strip() == "['lock.one']"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"lock"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.domain", "single_binary_sensor")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 8
|
|
assert specific_runs[7].strip() == "unknown"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": set(),
|
|
"entities": {"binary_sensor.single", "sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("binary_sensor.single", "binary_sensor_on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 9
|
|
assert specific_runs[8].strip() == "binary_sensor_on"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": set(),
|
|
"entities": {"binary_sensor.single", "sensor.domain"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.domain", "lock")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 10
|
|
assert specific_runs[9].strip() == "['lock.one']"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"lock"},
|
|
"entities": {"sensor.domain"},
|
|
}
|
|
|
|
|
|
async def test_track_template_result_with_wildcard(hass):
|
|
"""Test tracking template with a wildcard."""
|
|
specific_runs = []
|
|
template_complex_str = r"""
|
|
|
|
{% for state in states %}
|
|
{% if state.entity_id | regex_match('.*\.office_') %}
|
|
{{ state.entity_id }}={{ state.state }}
|
|
{% endif %}
|
|
{% endfor %}
|
|
|
|
"""
|
|
template_complex = Template(template_complex_str, hass)
|
|
|
|
def specific_run_callback(event, updates):
|
|
specific_runs.append(updates.pop().result)
|
|
|
|
hass.states.async_set("cover.office_drapes", "closed")
|
|
hass.states.async_set("cover.office_window", "closed")
|
|
hass.states.async_set("cover.office_skylight", "open")
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template_complex, None)], specific_run_callback
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("cover.office_window", "open")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert info.listeners == {"all": True, "domains": set(), "entities": set()}
|
|
|
|
assert "cover.office_drapes=closed" in specific_runs[0]
|
|
assert "cover.office_window=open" in specific_runs[0]
|
|
assert "cover.office_skylight=open" in specific_runs[0]
|
|
|
|
|
|
async def test_track_template_result_with_group(hass):
|
|
"""Test tracking template with a group."""
|
|
hass.states.async_set("sensor.power_1", 0)
|
|
hass.states.async_set("sensor.power_2", 200.2)
|
|
hass.states.async_set("sensor.power_3", 400.4)
|
|
hass.states.async_set("sensor.power_4", 800.8)
|
|
|
|
assert await async_setup_component(
|
|
hass,
|
|
"group",
|
|
{"group": {"power_sensors": "sensor.power_1,sensor.power_2,sensor.power_3"}},
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert hass.states.get("group.power_sensors")
|
|
assert hass.states.get("group.power_sensors").state
|
|
|
|
specific_runs = []
|
|
template_complex_str = r"""
|
|
|
|
{{ states.group.power_sensors.attributes.entity_id | expand | map(attribute='state')|map('float')|sum }}
|
|
|
|
"""
|
|
template_complex = Template(template_complex_str, hass)
|
|
|
|
def specific_run_callback(event, updates):
|
|
specific_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template_complex, None)], specific_run_callback
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": set(),
|
|
"entities": {
|
|
"group.power_sensors",
|
|
"sensor.power_1",
|
|
"sensor.power_2",
|
|
"sensor.power_3",
|
|
},
|
|
}
|
|
|
|
hass.states.async_set("sensor.power_1", 100.1)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
assert specific_runs[0] == str(100.1 + 200.2 + 400.4)
|
|
|
|
hass.states.async_set("sensor.power_3", 0)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
assert specific_runs[1] == str(100.1 + 200.2 + 0)
|
|
|
|
with patch(
|
|
"homeassistant.config.load_yaml_config_file",
|
|
return_value={
|
|
"group": {
|
|
"power_sensors": "sensor.power_1,sensor.power_2,sensor.power_3,sensor.power_4",
|
|
}
|
|
},
|
|
):
|
|
await hass.services.async_call("group", "reload")
|
|
await hass.async_block_till_done()
|
|
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
assert specific_runs[-1] == str(100.1 + 200.2 + 0 + 800.8)
|
|
|
|
|
|
async def test_track_template_result_and_conditional(hass):
|
|
"""Test tracking template with an and conditional."""
|
|
specific_runs = []
|
|
hass.states.async_set("light.a", "off")
|
|
hass.states.async_set("light.b", "off")
|
|
template_str = '{% if states.light.a.state == "on" and states.light.b.state == "on" %}on{% else %}off{% endif %}'
|
|
|
|
template = Template(template_str, hass)
|
|
|
|
def specific_run_callback(event, updates):
|
|
specific_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template, None)], specific_run_callback
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert info.listeners == {"all": False, "domains": set(), "entities": {"light.a"}}
|
|
|
|
hass.states.async_set("light.b", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
hass.states.async_set("light.a", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert specific_runs[0] == "on"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": set(),
|
|
"entities": {"light.a", "light.b"},
|
|
}
|
|
|
|
hass.states.async_set("light.b", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
assert specific_runs[1] == "off"
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": set(),
|
|
"entities": {"light.a", "light.b"},
|
|
}
|
|
|
|
hass.states.async_set("light.a", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
hass.states.async_set("light.b", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
hass.states.async_set("light.a", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 3
|
|
assert specific_runs[2] == "on"
|
|
|
|
|
|
async def test_track_template_result_iterator(hass):
|
|
"""Test tracking template."""
|
|
iterator_runs = []
|
|
|
|
@ha.callback
|
|
def iterator_callback(event, updates):
|
|
iterator_runs.append(updates.pop().result)
|
|
|
|
async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(
|
|
Template(
|
|
"""
|
|
{% for state in states.sensor %}
|
|
{% if state.state == 'on' %}
|
|
{{ state.entity_id }},
|
|
{% endif %}
|
|
{% endfor %}
|
|
""",
|
|
hass,
|
|
),
|
|
None,
|
|
timedelta(seconds=0),
|
|
)
|
|
],
|
|
iterator_callback,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("sensor.test", 5)
|
|
await hass.async_block_till_done()
|
|
|
|
assert iterator_runs == [""]
|
|
|
|
filter_runs = []
|
|
|
|
@ha.callback
|
|
def filter_callback(event, updates):
|
|
filter_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(
|
|
Template(
|
|
"""{{ states.sensor|selectattr("state","equalto","on")
|
|
|join(",", attribute="entity_id") }}""",
|
|
hass,
|
|
),
|
|
None,
|
|
timedelta(seconds=0),
|
|
)
|
|
],
|
|
filter_callback,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert info.listeners == {
|
|
"all": False,
|
|
"domains": {"sensor"},
|
|
"entities": {"sensor.test"},
|
|
}
|
|
|
|
hass.states.async_set("sensor.test", 6)
|
|
await hass.async_block_till_done()
|
|
|
|
assert filter_runs == [""]
|
|
assert iterator_runs == [""]
|
|
|
|
hass.states.async_set("sensor.new", "on")
|
|
await hass.async_block_till_done()
|
|
assert iterator_runs == ["", "sensor.new,"]
|
|
assert filter_runs == ["", "sensor.new"]
|
|
|
|
|
|
async def test_track_template_result_errors(hass, caplog):
|
|
"""Test tracking template with errors in the template."""
|
|
template_syntax_error = Template("{{states.switch", hass)
|
|
|
|
template_not_exist = Template("{{states.switch.not_exist.state }}", hass)
|
|
|
|
syntax_error_runs = []
|
|
not_exist_runs = []
|
|
|
|
@ha.callback
|
|
def syntax_error_listener(event, updates):
|
|
track_result = updates.pop()
|
|
syntax_error_runs.append(
|
|
(
|
|
event,
|
|
track_result.template,
|
|
track_result.last_result,
|
|
track_result.result,
|
|
)
|
|
)
|
|
|
|
async_track_template_result(
|
|
hass, [TrackTemplate(template_syntax_error, None)], syntax_error_listener
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(syntax_error_runs) == 0
|
|
assert "TemplateSyntaxError" in caplog.text
|
|
|
|
@ha.callback
|
|
def not_exist_runs_error_listener(event, updates):
|
|
template_track = updates.pop()
|
|
not_exist_runs.append(
|
|
(
|
|
event,
|
|
template_track.template,
|
|
template_track.last_result,
|
|
template_track.result,
|
|
)
|
|
)
|
|
|
|
async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_not_exist, None)],
|
|
not_exist_runs_error_listener,
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(syntax_error_runs) == 0
|
|
assert len(not_exist_runs) == 0
|
|
|
|
hass.states.async_set("switch.not_exist", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(not_exist_runs) == 1
|
|
assert not_exist_runs[0][0].data.get("entity_id") == "switch.not_exist"
|
|
assert not_exist_runs[0][1] == template_not_exist
|
|
assert not_exist_runs[0][2] is None
|
|
assert not_exist_runs[0][3] == "off"
|
|
|
|
hass.states.async_set("switch.not_exist", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(syntax_error_runs) == 1
|
|
assert len(not_exist_runs) == 2
|
|
assert not_exist_runs[1][0].data.get("entity_id") == "switch.not_exist"
|
|
assert not_exist_runs[1][1] == template_not_exist
|
|
assert not_exist_runs[1][2] == "off"
|
|
assert not_exist_runs[1][3] == "on"
|
|
|
|
with patch.object(Template, "async_render") as render:
|
|
render.side_effect = TemplateError(jinja2.TemplateError())
|
|
|
|
hass.states.async_set("switch.not_exist", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(not_exist_runs) == 3
|
|
assert not_exist_runs[2][0].data.get("entity_id") == "switch.not_exist"
|
|
assert not_exist_runs[2][1] == template_not_exist
|
|
assert not_exist_runs[2][2] == "on"
|
|
assert isinstance(not_exist_runs[2][3], TemplateError)
|
|
|
|
|
|
async def test_static_string(hass):
|
|
"""Test a static string."""
|
|
template_refresh = Template("{{ 'static' }}", hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template_refresh, None)], refresh_listener
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["static"]
|
|
|
|
|
|
async def test_track_template_rate_limit(hass):
|
|
"""Test template rate limit."""
|
|
template_refresh = Template("{{ states | count }}", hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_refresh, None, timedelta(seconds=0.1))],
|
|
refresh_listener,
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["0"]
|
|
hass.states.async_set("sensor.one", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0"]
|
|
info.async_refresh()
|
|
assert refresh_runs == ["0", "1"]
|
|
hass.states.async_set("sensor.two", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1"]
|
|
next_time = dt_util.utcnow() + timedelta(seconds=0.125)
|
|
with patch(
|
|
"homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time
|
|
):
|
|
async_fire_time_changed(hass, next_time)
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1", "2"]
|
|
hass.states.async_set("sensor.three", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1", "2"]
|
|
hass.states.async_set("sensor.four", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1", "2"]
|
|
next_time = dt_util.utcnow() + timedelta(seconds=0.125 * 2)
|
|
with patch(
|
|
"homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time
|
|
):
|
|
async_fire_time_changed(hass, next_time)
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1", "2", "4"]
|
|
hass.states.async_set("sensor.five", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1", "2", "4"]
|
|
|
|
|
|
async def test_track_template_rate_limit_five(hass):
|
|
"""Test template rate limit of 5 seconds."""
|
|
template_refresh = Template("{{ states | count }}", hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_refresh, None, timedelta(seconds=5))],
|
|
refresh_listener,
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["0"]
|
|
hass.states.async_set("sensor.one", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0"]
|
|
info.async_refresh()
|
|
assert refresh_runs == ["0", "1"]
|
|
hass.states.async_set("sensor.two", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1"]
|
|
hass.states.async_set("sensor.three", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["0", "1"]
|
|
|
|
|
|
async def test_specifically_referenced_entity_is_not_rate_limited(hass):
|
|
"""Test template rate limit of 5 seconds."""
|
|
hass.states.async_set("sensor.one", "none")
|
|
|
|
template_refresh = Template('{{ states | count }}_{{ states("sensor.one") }}', hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_refresh, None, timedelta(seconds=5))],
|
|
refresh_listener,
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["1_none"]
|
|
hass.states.async_set("sensor.one", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["1_none", "1_any"]
|
|
info.async_refresh()
|
|
assert refresh_runs == ["1_none", "1_any"]
|
|
hass.states.async_set("sensor.two", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["1_none", "1_any"]
|
|
hass.states.async_set("sensor.three", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["1_none", "1_any"]
|
|
hass.states.async_set("sensor.one", "none")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["1_none", "1_any", "3_none"]
|
|
|
|
|
|
async def test_track_two_templates_with_different_rate_limits(hass):
|
|
"""Test two templates with different rate limits."""
|
|
template_one = Template("{{ states | count }} ", hass)
|
|
template_five = Template("{{ states | count }}", hass)
|
|
|
|
refresh_runs = {
|
|
template_one: [],
|
|
template_five: [],
|
|
}
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
for update in updates:
|
|
refresh_runs[update.template].append(update.result)
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(template_one, None, timedelta(seconds=0.1)),
|
|
TrackTemplate(template_five, None, timedelta(seconds=5)),
|
|
],
|
|
refresh_listener,
|
|
)
|
|
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs[template_one] == ["0"]
|
|
assert refresh_runs[template_five] == ["0"]
|
|
hass.states.async_set("sensor.one", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0"]
|
|
assert refresh_runs[template_five] == ["0"]
|
|
info.async_refresh()
|
|
assert refresh_runs[template_one] == ["0", "1"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
hass.states.async_set("sensor.two", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0", "1"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
next_time = dt_util.utcnow() + timedelta(seconds=0.125 * 1)
|
|
with patch(
|
|
"homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time
|
|
):
|
|
async_fire_time_changed(hass, next_time)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0", "1", "2"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
hass.states.async_set("sensor.three", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0", "1", "2"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
hass.states.async_set("sensor.four", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0", "1", "2"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
hass.states.async_set("sensor.five", "any")
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs[template_one] == ["0", "1", "2"]
|
|
assert refresh_runs[template_five] == ["0", "1"]
|
|
|
|
|
|
async def test_string(hass):
|
|
"""Test a string."""
|
|
template_refresh = Template("no_template", hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template_refresh, None)], refresh_listener
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["no_template"]
|
|
|
|
|
|
async def test_track_template_result_refresh_cancel(hass):
|
|
"""Test cancelling and refreshing result."""
|
|
template_refresh = Template("{{states.switch.test.state == 'on' and now() }}", hass)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates.pop().result)
|
|
|
|
info = async_track_template_result(
|
|
hass, [TrackTemplate(template_refresh, None)], refresh_listener
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["False"]
|
|
|
|
assert len(refresh_runs) == 1
|
|
|
|
info.async_refresh()
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(refresh_runs) == 2
|
|
assert refresh_runs[0] != refresh_runs[1]
|
|
|
|
info.async_remove()
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(refresh_runs) == 2
|
|
|
|
template_refresh = Template("{{ value }}", hass)
|
|
refresh_runs = []
|
|
|
|
info = async_track_template_result(
|
|
hass,
|
|
[TrackTemplate(template_refresh, {"value": "duck"})],
|
|
refresh_listener,
|
|
)
|
|
await hass.async_block_till_done()
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == ["duck"]
|
|
|
|
info.async_refresh()
|
|
await hass.async_block_till_done()
|
|
assert refresh_runs == ["duck"]
|
|
|
|
|
|
async def test_async_track_template_result_multiple_templates(hass):
|
|
"""Test tracking multiple templates."""
|
|
|
|
template_1 = Template("{{ states.switch.test.state == 'on' }}")
|
|
template_2 = Template("{{ states.switch.test.state == 'on' }}")
|
|
template_3 = Template("{{ states.switch.test.state == 'off' }}")
|
|
template_4 = Template(
|
|
"{{ states.binary_sensor | map(attribute='entity_id') | list }}"
|
|
)
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates)
|
|
|
|
async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(template_1, None),
|
|
TrackTemplate(template_2, None),
|
|
TrackTemplate(template_3, None),
|
|
TrackTemplate(template_4, None),
|
|
],
|
|
refresh_listener,
|
|
)
|
|
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[
|
|
TrackTemplateResult(template_1, None, "True"),
|
|
TrackTemplateResult(template_2, None, "True"),
|
|
TrackTemplateResult(template_3, None, "False"),
|
|
]
|
|
]
|
|
|
|
refresh_runs = []
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[
|
|
TrackTemplateResult(template_1, "True", "False"),
|
|
TrackTemplateResult(template_2, "True", "False"),
|
|
TrackTemplateResult(template_3, "False", "True"),
|
|
]
|
|
]
|
|
|
|
refresh_runs = []
|
|
hass.states.async_set("binary_sensor.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[TrackTemplateResult(template_4, None, "['binary_sensor.test']")]
|
|
]
|
|
|
|
|
|
async def test_async_track_template_result_multiple_templates_mixing_domain(hass):
|
|
"""Test tracking multiple templates when tracking entities and an entire domain."""
|
|
|
|
template_1 = Template("{{ states.switch.test.state == 'on' }}")
|
|
template_2 = Template("{{ states.switch.test.state == 'on' }}")
|
|
template_3 = Template("{{ states.switch.test.state == 'off' }}")
|
|
template_4 = Template("{{ states.switch | map(attribute='entity_id') | list }}")
|
|
|
|
refresh_runs = []
|
|
|
|
@ha.callback
|
|
def refresh_listener(event, updates):
|
|
refresh_runs.append(updates)
|
|
|
|
async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(template_1, None),
|
|
TrackTemplate(template_2, None),
|
|
TrackTemplate(template_3, None),
|
|
TrackTemplate(template_4, None, timedelta(seconds=0)),
|
|
],
|
|
refresh_listener,
|
|
)
|
|
|
|
hass.states.async_set("switch.test", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[
|
|
TrackTemplateResult(template_1, None, "True"),
|
|
TrackTemplateResult(template_2, None, "True"),
|
|
TrackTemplateResult(template_3, None, "False"),
|
|
TrackTemplateResult(template_4, None, "['switch.test']"),
|
|
]
|
|
]
|
|
|
|
refresh_runs = []
|
|
hass.states.async_set("switch.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[
|
|
TrackTemplateResult(template_1, "True", "False"),
|
|
TrackTemplateResult(template_2, "True", "False"),
|
|
TrackTemplateResult(template_3, "False", "True"),
|
|
]
|
|
]
|
|
|
|
refresh_runs = []
|
|
hass.states.async_set("binary_sensor.test", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == []
|
|
|
|
refresh_runs = []
|
|
hass.states.async_set("switch.new", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert refresh_runs == [
|
|
[
|
|
TrackTemplateResult(
|
|
template_4, "['switch.test']", "['switch.new', 'switch.test']"
|
|
)
|
|
]
|
|
]
|
|
|
|
|
|
async def test_async_track_template_result_raise_on_template_error(hass):
|
|
"""Test that we raise as soon as we encounter a failed template."""
|
|
|
|
with pytest.raises(TemplateError):
|
|
async_track_template_result(
|
|
hass,
|
|
[
|
|
TrackTemplate(
|
|
Template(
|
|
"{{ states.switch | function_that_does_not_exist | list }}"
|
|
),
|
|
None,
|
|
),
|
|
],
|
|
ha.callback(lambda event, updates: None),
|
|
raise_on_template_error=True,
|
|
)
|
|
|
|
|
|
async def test_track_same_state_simple_no_trigger(hass):
|
|
"""Test track_same_change with no trigger."""
|
|
callback_runs = []
|
|
period = timedelta(minutes=1)
|
|
|
|
@ha.callback
|
|
def callback_run_callback():
|
|
callback_runs.append(1)
|
|
|
|
async_track_same_state(
|
|
hass,
|
|
period,
|
|
callback_run_callback,
|
|
callback(lambda _, _2, to_s: to_s.state == "on"),
|
|
entity_ids="light.Bowl",
|
|
)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
assert len(callback_runs) == 0
|
|
|
|
# Change state on state machine
|
|
hass.states.async_set("light.Bowl", "off")
|
|
await hass.async_block_till_done()
|
|
assert len(callback_runs) == 0
|
|
|
|
# change time to track and see if they trigger
|
|
future = dt_util.utcnow() + period
|
|
async_fire_time_changed(hass, future)
|
|
await hass.async_block_till_done()
|
|
assert len(callback_runs) == 0
|
|
|
|
|
|
async def test_track_same_state_simple_trigger_check_funct(hass):
|
|
"""Test track_same_change with trigger and check funct."""
|
|
callback_runs = []
|
|
check_func = []
|
|
period = timedelta(minutes=1)
|
|
|
|
@ha.callback
|
|
def callback_run_callback():
|
|
callback_runs.append(1)
|
|
|
|
@ha.callback
|
|
def async_check_func(entity, from_s, to_s):
|
|
check_func.append((entity, from_s, to_s))
|
|
return True
|
|
|
|
async_track_same_state(
|
|
hass,
|
|
period,
|
|
callback_run_callback,
|
|
entity_ids="light.Bowl",
|
|
async_check_same_func=async_check_func,
|
|
)
|
|
|
|
# Adding state to state machine
|
|
hass.states.async_set("light.Bowl", "on")
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
assert len(callback_runs) == 0
|
|
assert check_func[-1][2].state == "on"
|
|
assert check_func[-1][0] == "light.bowl"
|
|
|
|
# change time to track and see if they trigger
|
|
future = dt_util.utcnow() + period
|
|
async_fire_time_changed(hass, future)
|
|
await hass.async_block_till_done()
|
|
assert len(callback_runs) == 1
|
|
|
|
|
|
async def test_track_time_interval(hass):
|
|
"""Test tracking time interval."""
|
|
specific_runs = []
|
|
|
|
utc_now = dt_util.utcnow()
|
|
unsub = async_track_time_interval(
|
|
hass, callback(lambda x: specific_runs.append(x)), timedelta(seconds=10)
|
|
)
|
|
|
|
async_fire_time_changed(hass, utc_now + timedelta(seconds=5))
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
async_fire_time_changed(hass, utc_now + timedelta(seconds=13))
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(hass, utc_now + timedelta(minutes=20))
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
unsub()
|
|
|
|
async_fire_time_changed(hass, utc_now + timedelta(seconds=30))
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
|
|
async def test_track_sunrise(hass, legacy_patchable_time):
|
|
"""Test track the sunrise."""
|
|
latitude = 32.87336
|
|
longitude = 117.22743
|
|
|
|
# Setup sun component
|
|
hass.config.latitude = latitude
|
|
hass.config.longitude = longitude
|
|
assert await async_setup_component(
|
|
hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}
|
|
)
|
|
|
|
# Get next sunrise/sunset
|
|
astral = Astral()
|
|
utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
utc_today = utc_now.date()
|
|
|
|
mod = -1
|
|
while True:
|
|
next_rising = astral.sunrise_utc(
|
|
utc_today + timedelta(days=mod), latitude, longitude
|
|
)
|
|
if next_rising > utc_now:
|
|
break
|
|
mod += 1
|
|
|
|
# Track sunrise
|
|
runs = []
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
unsub = async_track_sunrise(hass, callback(lambda: runs.append(1)))
|
|
|
|
offset_runs = []
|
|
offset = timedelta(minutes=30)
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
unsub2 = async_track_sunrise(
|
|
hass, callback(lambda: offset_runs.append(1)), offset
|
|
)
|
|
|
|
# run tests
|
|
async_fire_time_changed(hass, next_rising - offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 0
|
|
assert len(offset_runs) == 0
|
|
|
|
async_fire_time_changed(hass, next_rising)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 0
|
|
|
|
async_fire_time_changed(hass, next_rising + offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 1
|
|
|
|
unsub()
|
|
unsub2()
|
|
|
|
async_fire_time_changed(hass, next_rising + offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 1
|
|
|
|
|
|
async def test_track_sunrise_update_location(hass, legacy_patchable_time):
|
|
"""Test track the sunrise."""
|
|
# Setup sun component
|
|
hass.config.latitude = 32.87336
|
|
hass.config.longitude = 117.22743
|
|
assert await async_setup_component(
|
|
hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}
|
|
)
|
|
|
|
# Get next sunrise
|
|
astral = Astral()
|
|
utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
utc_today = utc_now.date()
|
|
|
|
mod = -1
|
|
while True:
|
|
next_rising = astral.sunrise_utc(
|
|
utc_today + timedelta(days=mod), hass.config.latitude, hass.config.longitude
|
|
)
|
|
if next_rising > utc_now:
|
|
break
|
|
mod += 1
|
|
|
|
# Track sunrise
|
|
runs = []
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
async_track_sunrise(hass, callback(lambda: runs.append(1)))
|
|
|
|
# Mimic sunrise
|
|
async_fire_time_changed(hass, next_rising)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
|
|
# Move!
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
await hass.config.async_update(latitude=40.755931, longitude=-73.984606)
|
|
await hass.async_block_till_done()
|
|
|
|
# Mimic sunrise
|
|
async_fire_time_changed(hass, next_rising)
|
|
await hass.async_block_till_done()
|
|
# Did not increase
|
|
assert len(runs) == 1
|
|
|
|
# Get next sunrise
|
|
mod = -1
|
|
while True:
|
|
next_rising = astral.sunrise_utc(
|
|
utc_today + timedelta(days=mod), hass.config.latitude, hass.config.longitude
|
|
)
|
|
if next_rising > utc_now:
|
|
break
|
|
mod += 1
|
|
|
|
# Mimic sunrise at new location
|
|
async_fire_time_changed(hass, next_rising)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 2
|
|
|
|
|
|
async def test_track_sunset(hass, legacy_patchable_time):
|
|
"""Test track the sunset."""
|
|
latitude = 32.87336
|
|
longitude = 117.22743
|
|
|
|
# Setup sun component
|
|
hass.config.latitude = latitude
|
|
hass.config.longitude = longitude
|
|
assert await async_setup_component(
|
|
hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}
|
|
)
|
|
|
|
# Get next sunrise/sunset
|
|
astral = Astral()
|
|
utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC)
|
|
utc_today = utc_now.date()
|
|
|
|
mod = -1
|
|
while True:
|
|
next_setting = astral.sunset_utc(
|
|
utc_today + timedelta(days=mod), latitude, longitude
|
|
)
|
|
if next_setting > utc_now:
|
|
break
|
|
mod += 1
|
|
|
|
# Track sunset
|
|
runs = []
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
unsub = async_track_sunset(hass, callback(lambda: runs.append(1)))
|
|
|
|
offset_runs = []
|
|
offset = timedelta(minutes=30)
|
|
with patch("homeassistant.util.dt.utcnow", return_value=utc_now):
|
|
unsub2 = async_track_sunset(
|
|
hass, callback(lambda: offset_runs.append(1)), offset
|
|
)
|
|
|
|
# Run tests
|
|
async_fire_time_changed(hass, next_setting - offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 0
|
|
assert len(offset_runs) == 0
|
|
|
|
async_fire_time_changed(hass, next_setting)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 0
|
|
|
|
async_fire_time_changed(hass, next_setting + offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 1
|
|
|
|
unsub()
|
|
unsub2()
|
|
|
|
async_fire_time_changed(hass, next_setting + offset)
|
|
await hass.async_block_till_done()
|
|
assert len(runs) == 1
|
|
assert len(offset_runs) == 1
|
|
|
|
|
|
async def test_async_track_time_change(hass):
|
|
"""Test tracking time change."""
|
|
wildcard_runs = []
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = datetime(
|
|
now.year + 1, 5, 24, 11, 59, 55, tzinfo=dt_util.UTC
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_time_change(
|
|
hass, callback(lambda x: wildcard_runs.append(x))
|
|
)
|
|
unsub_utc = async_track_utc_time_change(
|
|
hass, callback(lambda x: specific_runs.append(x)), second=[0, 30]
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 0, 15, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
assert len(wildcard_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 0, 30, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
assert len(wildcard_runs) == 3
|
|
|
|
unsub()
|
|
unsub_utc()
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 0, 30, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
assert len(wildcard_runs) == 3
|
|
|
|
|
|
async def test_periodic_task_minute(hass):
|
|
"""Test periodic tasks per minute."""
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = datetime(
|
|
now.year + 1, 5, 24, 11, 59, 55, tzinfo=dt_util.UTC
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_utc_time_change(
|
|
hass, callback(lambda x: specific_runs.append(x)), minute="/5", second=0
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 3, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 5, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
unsub()
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 12, 5, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
|
|
async def test_periodic_task_hour(hass):
|
|
"""Test periodic tasks per hour."""
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = datetime(
|
|
now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_utc_time_change(
|
|
hass,
|
|
callback(lambda x: specific_runs.append(x)),
|
|
hour="/2",
|
|
minute=0,
|
|
second=0,
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 23, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 0, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 1, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 3
|
|
|
|
unsub()
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 2, 0, 0, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 3
|
|
|
|
|
|
async def test_periodic_task_wrong_input(hass):
|
|
"""Test periodic tasks with wrong input."""
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
with pytest.raises(ValueError):
|
|
async_track_utc_time_change(
|
|
hass, callback(lambda x: specific_runs.append(x)), hour="/two"
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 2, 0, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
|
|
async def test_periodic_task_clock_rollback(hass):
|
|
"""Test periodic tasks with the time rolling backwards."""
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = datetime(
|
|
now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_utc_time_change(
|
|
hass,
|
|
callback(lambda x: specific_runs.append(x)),
|
|
hour="/2",
|
|
minute=0,
|
|
second=0,
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 23, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC),
|
|
fire_all=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
datetime(now.year + 1, 5, 24, 0, 0, 0, 999999, tzinfo=dt_util.UTC),
|
|
fire_all=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 3
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 4
|
|
|
|
unsub()
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 4
|
|
|
|
|
|
async def test_periodic_task_duplicate_time(hass):
|
|
"""Test periodic tasks not triggering on duplicate time."""
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = datetime(
|
|
now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_utc_time_change(
|
|
hass,
|
|
callback(lambda x: specific_runs.append(x)),
|
|
hour="/2",
|
|
minute=0,
|
|
second=0,
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass, datetime(now.year + 1, 5, 25, 0, 0, 0, 999999, tzinfo=dt_util.UTC)
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
unsub()
|
|
|
|
|
|
async def test_periodic_task_entering_dst(hass):
|
|
"""Test periodic task behavior when entering dst."""
|
|
timezone = dt_util.get_time_zone("Europe/Vienna")
|
|
dt_util.set_default_time_zone(timezone)
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
time_that_will_not_match_right_away = timezone.localize(
|
|
datetime(now.year + 1, 3, 25, 2, 31, 0)
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_time_change(
|
|
hass,
|
|
callback(lambda x: specific_runs.append(x)),
|
|
hour=2,
|
|
minute=30,
|
|
second=0,
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass, timezone.localize(datetime(now.year + 1, 3, 25, 1, 50, 0, 999999))
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
async_fire_time_changed(
|
|
hass, timezone.localize(datetime(now.year + 1, 3, 25, 3, 50, 0, 999999))
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
async_fire_time_changed(
|
|
hass, timezone.localize(datetime(now.year + 1, 3, 26, 1, 50, 0, 999999))
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
async_fire_time_changed(
|
|
hass, timezone.localize(datetime(now.year + 1, 3, 26, 2, 50, 0, 999999))
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
unsub()
|
|
|
|
|
|
async def test_periodic_task_leaving_dst(hass):
|
|
"""Test periodic task behavior when leaving dst."""
|
|
timezone = dt_util.get_time_zone("Europe/Vienna")
|
|
dt_util.set_default_time_zone(timezone)
|
|
specific_runs = []
|
|
|
|
now = dt_util.utcnow()
|
|
|
|
time_that_will_not_match_right_away = timezone.localize(
|
|
datetime(now.year + 1, 10, 28, 2, 28, 0), is_dst=True
|
|
)
|
|
|
|
with patch(
|
|
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
|
|
):
|
|
unsub = async_track_time_change(
|
|
hass,
|
|
callback(lambda x: specific_runs.append(x)),
|
|
hour=2,
|
|
minute=30,
|
|
second=0,
|
|
)
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
timezone.localize(
|
|
datetime(now.year + 1, 10, 28, 2, 5, 0, 999999), is_dst=False
|
|
),
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 0
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
timezone.localize(
|
|
datetime(now.year + 1, 10, 28, 2, 55, 0, 999999), is_dst=False
|
|
),
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 1
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
timezone.localize(
|
|
datetime(now.year + 2, 10, 28, 2, 45, 0, 999999), is_dst=True
|
|
),
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
timezone.localize(
|
|
datetime(now.year + 2, 10, 28, 2, 55, 0, 999999), is_dst=True
|
|
),
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
async_fire_time_changed(
|
|
hass,
|
|
timezone.localize(
|
|
datetime(now.year + 2, 10, 28, 2, 55, 0, 999999), is_dst=True
|
|
),
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(specific_runs) == 2
|
|
|
|
unsub()
|
|
|
|
|
|
async def test_call_later(hass):
|
|
"""Test calling an action later."""
|
|
|
|
def action():
|
|
pass
|
|
|
|
now = datetime(2017, 12, 19, 15, 40, 0, tzinfo=dt_util.UTC)
|
|
|
|
with patch(
|
|
"homeassistant.helpers.event.async_track_point_in_utc_time"
|
|
) as mock, patch("homeassistant.util.dt.utcnow", return_value=now):
|
|
async_call_later(hass, 3, action)
|
|
|
|
assert len(mock.mock_calls) == 1
|
|
p_hass, p_action, p_point = mock.mock_calls[0][1]
|
|
assert p_hass is hass
|
|
assert p_action is action
|
|
assert p_point == now + timedelta(seconds=3)
|
|
|
|
|
|
async def test_async_call_later(hass):
|
|
"""Test calling an action later."""
|
|
|
|
def action():
|
|
pass
|
|
|
|
now = datetime(2017, 12, 19, 15, 40, 0, tzinfo=dt_util.UTC)
|
|
|
|
with patch(
|
|
"homeassistant.helpers.event.async_track_point_in_utc_time"
|
|
) as mock, patch("homeassistant.util.dt.utcnow", return_value=now):
|
|
remove = async_call_later(hass, 3, action)
|
|
|
|
assert len(mock.mock_calls) == 1
|
|
p_hass, p_action, p_point = mock.mock_calls[0][1]
|
|
assert p_hass is hass
|
|
assert p_action is action
|
|
assert p_point == now + timedelta(seconds=3)
|
|
assert remove is mock()
|
|
|
|
|
|
async def test_track_state_change_event_chain_multple_entity(hass):
|
|
"""Test that adding a new state tracker inside a tracker does not fire right away."""
|
|
tracker_called = []
|
|
chained_tracker_called = []
|
|
|
|
chained_tracker_unsub = []
|
|
tracker_unsub = []
|
|
|
|
@ha.callback
|
|
def chained_single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
chained_tracker_called.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
tracker_called.append((old_state, new_state))
|
|
|
|
chained_tracker_unsub.append(
|
|
async_track_state_change_event(
|
|
hass, ["light.bowl", "light.top"], chained_single_run_callback
|
|
)
|
|
)
|
|
|
|
tracker_unsub.append(
|
|
async_track_state_change_event(
|
|
hass, ["light.bowl", "light.top"], single_run_callback
|
|
)
|
|
)
|
|
|
|
hass.states.async_set("light.bowl", "on")
|
|
hass.states.async_set("light.top", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(tracker_called) == 2
|
|
assert len(chained_tracker_called) == 1
|
|
assert len(tracker_unsub) == 1
|
|
assert len(chained_tracker_unsub) == 2
|
|
|
|
hass.states.async_set("light.bowl", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(tracker_called) == 3
|
|
assert len(chained_tracker_called) == 3
|
|
assert len(tracker_unsub) == 1
|
|
assert len(chained_tracker_unsub) == 3
|
|
|
|
|
|
async def test_track_state_change_event_chain_single_entity(hass):
|
|
"""Test that adding a new state tracker inside a tracker does not fire right away."""
|
|
tracker_called = []
|
|
chained_tracker_called = []
|
|
|
|
chained_tracker_unsub = []
|
|
tracker_unsub = []
|
|
|
|
@ha.callback
|
|
def chained_single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
chained_tracker_called.append((old_state, new_state))
|
|
|
|
@ha.callback
|
|
def single_run_callback(event):
|
|
old_state = event.data.get("old_state")
|
|
new_state = event.data.get("new_state")
|
|
|
|
tracker_called.append((old_state, new_state))
|
|
|
|
chained_tracker_unsub.append(
|
|
async_track_state_change_event(
|
|
hass, "light.bowl", chained_single_run_callback
|
|
)
|
|
)
|
|
|
|
tracker_unsub.append(
|
|
async_track_state_change_event(hass, "light.bowl", single_run_callback)
|
|
)
|
|
|
|
hass.states.async_set("light.bowl", "on")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(tracker_called) == 1
|
|
assert len(chained_tracker_called) == 0
|
|
assert len(tracker_unsub) == 1
|
|
assert len(chained_tracker_unsub) == 1
|
|
|
|
hass.states.async_set("light.bowl", "off")
|
|
await hass.async_block_till_done()
|
|
|
|
assert len(tracker_called) == 2
|
|
assert len(chained_tracker_called) == 1
|
|
assert len(tracker_unsub) == 1
|
|
assert len(chained_tracker_unsub) == 2
|
|
|
|
|
|
async def test_track_point_in_utc_time_cancel(hass):
|
|
"""Test cancel of async track point in time."""
|
|
|
|
times = []
|
|
|
|
@ha.callback
|
|
def run_callback(utc_time):
|
|
nonlocal times
|
|
times.append(utc_time)
|
|
|
|
def _setup_listeners():
|
|
"""Ensure we test the non-async version."""
|
|
utc_now = dt_util.utcnow()
|
|
|
|
with pytest.raises(TypeError):
|
|
track_point_in_utc_time("nothass", run_callback, utc_now)
|
|
|
|
unsub1 = hass.helpers.event.track_point_in_utc_time(
|
|
run_callback, utc_now + timedelta(seconds=0.1)
|
|
)
|
|
hass.helpers.event.track_point_in_utc_time(
|
|
run_callback, utc_now + timedelta(seconds=0.1)
|
|
)
|
|
|
|
unsub1()
|
|
|
|
await hass.async_add_executor_job(_setup_listeners)
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
assert len(times) == 1
|
|
assert times[0].tzinfo == dt_util.UTC
|
|
|
|
|
|
async def test_async_track_point_in_time_cancel(hass):
|
|
"""Test cancel of async track point in time."""
|
|
|
|
times = []
|
|
hst_tz = dt_util.get_time_zone("US/Hawaii")
|
|
dt_util.set_default_time_zone(hst_tz)
|
|
|
|
@ha.callback
|
|
def run_callback(local_time):
|
|
nonlocal times
|
|
times.append(local_time)
|
|
|
|
utc_now = dt_util.utcnow()
|
|
hst_now = utc_now.astimezone(hst_tz)
|
|
|
|
unsub1 = hass.helpers.event.async_track_point_in_time(
|
|
run_callback, hst_now + timedelta(seconds=0.1)
|
|
)
|
|
hass.helpers.event.async_track_point_in_time(
|
|
run_callback, hst_now + timedelta(seconds=0.1)
|
|
)
|
|
|
|
unsub1()
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
assert len(times) == 1
|
|
assert times[0].tzinfo.zone == "US/Hawaii"
|
|
|
|
|
|
async def test_async_track_entity_registry_updated_event(hass):
|
|
"""Test tracking entity registry updates for an entity_id."""
|
|
|
|
entity_id = "switch.puppy_feeder"
|
|
new_entity_id = "switch.dog_feeder"
|
|
untracked_entity_id = "switch.kitty_feeder"
|
|
|
|
hass.states.async_set(entity_id, "on")
|
|
await hass.async_block_till_done()
|
|
event_data = []
|
|
|
|
@ha.callback
|
|
def run_callback(event):
|
|
event_data.append(event.data)
|
|
|
|
unsub1 = hass.helpers.event.async_track_entity_registry_updated_event(
|
|
entity_id, run_callback
|
|
)
|
|
unsub2 = hass.helpers.event.async_track_entity_registry_updated_event(
|
|
new_entity_id, run_callback
|
|
)
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id}
|
|
)
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED,
|
|
{"action": "create", "entity_id": untracked_entity_id},
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED,
|
|
{
|
|
"action": "update",
|
|
"entity_id": new_entity_id,
|
|
"old_entity_id": entity_id,
|
|
"changes": {},
|
|
},
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED, {"action": "remove", "entity_id": new_entity_id}
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
unsub1()
|
|
unsub2()
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id}
|
|
)
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": new_entity_id}
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert event_data[0] == {"action": "create", "entity_id": "switch.puppy_feeder"}
|
|
assert event_data[1] == {
|
|
"action": "update",
|
|
"changes": {},
|
|
"entity_id": "switch.dog_feeder",
|
|
"old_entity_id": "switch.puppy_feeder",
|
|
}
|
|
assert event_data[2] == {"action": "remove", "entity_id": "switch.dog_feeder"}
|
|
|
|
|
|
async def test_async_track_entity_registry_updated_event_with_a_callback_that_throws(
|
|
hass,
|
|
):
|
|
"""Test tracking entity registry updates for an entity_id when one callback throws."""
|
|
|
|
entity_id = "switch.puppy_feeder"
|
|
|
|
hass.states.async_set(entity_id, "on")
|
|
await hass.async_block_till_done()
|
|
event_data = []
|
|
|
|
@ha.callback
|
|
def run_callback(event):
|
|
event_data.append(event.data)
|
|
|
|
@ha.callback
|
|
def failing_callback(event):
|
|
raise ValueError
|
|
|
|
unsub1 = hass.helpers.event.async_track_entity_registry_updated_event(
|
|
entity_id, failing_callback
|
|
)
|
|
unsub2 = hass.helpers.event.async_track_entity_registry_updated_event(
|
|
entity_id, run_callback
|
|
)
|
|
hass.bus.async_fire(
|
|
EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id}
|
|
)
|
|
await hass.async_block_till_done()
|
|
unsub1()
|
|
unsub2()
|
|
|
|
assert event_data[0] == {"action": "create", "entity_id": "switch.puppy_feeder"}
|