1
0
mirror of https://github.com/home-assistant/core.git synced 2026-07-03 12:46:09 +01:00
Files
2026-06-18 21:41:53 +02:00

77 lines
2.5 KiB
Python

"""Fixtures for AquaLogic tests."""
from collections.abc import Callable
from unittest.mock import MagicMock, patch
import pytest
from homeassistant.components.aqualogic import DOMAIN, AquaLogicProcessor
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@pytest.fixture
def mock_panel() -> MagicMock:
"""Return a mock AquaLogic panel with realistic sensor values."""
panel = MagicMock()
panel.is_metric = True
panel.air_temp = 25.5
panel.pool_temp = 28.0
panel.spa_temp = 30.0
panel.pool_chlorinator = 50.0
panel.spa_chlorinator = 50.0
panel.salt_level = 3.5
panel.pump_speed = 100.0
panel.pump_power = 1000.0
panel.status = "OK"
panel.get_state.return_value = True
return panel
@pytest.fixture
def mock_processor(mock_panel: MagicMock) -> MagicMock:
"""Return a mock AquaLogicProcessor registered in hass.data."""
with patch("homeassistant.components.aqualogic.AquaLogicProcessor") as mock_cls:
processor = MagicMock()
processor.panel = mock_panel
mock_cls.return_value = processor
yield processor
@pytest.fixture
async def init_integration(
hass: HomeAssistant, mock_panel: MagicMock
) -> AquaLogicProcessor:
"""Set up the AquaLogic integration and run one pass of run() to register the callback.
AquaLogic is mocked so mock_panel becomes processor.panel. _shutdown is set before
run() so it exits after a single iteration, registering the data_changed callback
with panel.process() without starting a real network thread.
"""
with patch("homeassistant.components.aqualogic.AquaLogic") as mock_al:
mock_al.return_value = mock_panel
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {CONF_HOST: "1.2.3.4", CONF_PORT: 8899}},
)
await hass.async_block_till_done()
processor: AquaLogicProcessor = hass.data[DOMAIN]
processor._shutdown = True
processor.run()
return processor
@pytest.fixture
def update_callback(
init_integration: AquaLogicProcessor, mock_panel: MagicMock
) -> Callable[[], None]:
"""Return a callable that fires a panel data update through the registered callback.
Extracts the data_changed callback from the mock's process() call args so tests
trigger updates through the same path as real panel data arriving.
"""
callback = mock_panel.process.call_args[0][0]
return lambda: callback(mock_panel)