mirror of
https://github.com/home-assistant/core.git
synced 2026-05-21 07:50:23 +01:00
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
"""Shared helpers for Overkiz integration tests."""
|
|
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
from freezegun.api import FrozenDateTimeFactory
|
|
from pyoverkiz.models import Event
|
|
|
|
from homeassistant.components.overkiz.const import UPDATE_INTERVAL
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
from .conftest import MockOverkizClient
|
|
|
|
from tests.common import async_fire_time_changed
|
|
|
|
|
|
def assert_command_call(
|
|
mock_client: MockOverkizClient,
|
|
*,
|
|
device_url: str,
|
|
command_name: str,
|
|
parameters: list[Any] | None = None,
|
|
) -> None:
|
|
"""Assert the latest command sent through the mocked Overkiz client."""
|
|
assert mock_client.execute_command.await_count == 1
|
|
args = mock_client.execute_command.await_args.args
|
|
assert args[0] == device_url
|
|
assert args[1].name == command_name
|
|
assert args[1].parameters == (parameters or [])
|
|
assert args[2] == "Home Assistant"
|
|
|
|
|
|
def build_event(
|
|
name: str,
|
|
*,
|
|
device_url: str,
|
|
device_states: list[dict[str, Any]] | None = None,
|
|
exec_id: str | None = None,
|
|
new_state: str | None = None,
|
|
) -> Event:
|
|
"""Create a pyoverkiz event object with a test-friendly interface."""
|
|
return Event(
|
|
name=name,
|
|
device_url=device_url,
|
|
device_states=device_states,
|
|
exec_id=exec_id,
|
|
new_state=new_state,
|
|
)
|
|
|
|
|
|
async def async_deliver_events(
|
|
hass: HomeAssistant,
|
|
freezer: FrozenDateTimeFactory,
|
|
mock_client: MockOverkizClient,
|
|
*event_batches: list[Event],
|
|
update_interval: timedelta = UPDATE_INTERVAL,
|
|
) -> None:
|
|
"""Queue event batches and advance time to trigger a coordinator refresh."""
|
|
mock_client.queue_events(*event_batches)
|
|
freezer.tick(update_interval)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|