1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-15 07:36:16 +00:00

Update waterfurnace integration to use Coordinator, instead of its own thread. (#161494)

Co-authored-by: Joostlek <joostlek@outlook.com>
This commit is contained in:
Andres Ruiz
2026-02-08 17:39:23 -05:00
committed by GitHub
parent 228fca9f0c
commit cf637f8c2f
7 changed files with 229 additions and 168 deletions

View File

@@ -2,40 +2,26 @@
from __future__ import annotations
from datetime import timedelta
import logging
import threading
import time
import voluptuous as vol
from waterfurnace.waterfurnace import WaterFurnace, WFCredentialError, WFException
from waterfurnace.waterfurnace import WaterFurnace, WFCredentialError
from homeassistant.components import persistent_notification
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_PASSWORD,
CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant, callback
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, Platform
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, issue_registry as ir
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, INTEGRATION_TITLE, MAX_FAILS
from .const import DOMAIN, INTEGRATION_TITLE
from .coordinator import WaterFurnaceCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR]
UPDATE_TOPIC = f"{DOMAIN}_update"
SCAN_INTERVAL = timedelta(seconds=10)
ERROR_INTERVAL = timedelta(seconds=300)
NOTIFICATION_ID = "waterfurnace_website_notification"
NOTIFICATION_TITLE = "WaterFurnace website status"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
@@ -47,7 +33,7 @@ CONFIG_SCHEMA = vol.Schema(
},
extra=vol.ALLOW_EXTRA,
)
type WaterFurnaceConfigEntry = ConfigEntry[WaterFurnaceData]
type WaterFurnaceConfigEntry = ConfigEntry[WaterFurnaceCoordinator]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@@ -123,106 +109,9 @@ async def async_setup_entry(
"Failed to connect to WaterFurnace service: No GWID found for device"
)
entry.runtime_data = client
coordinator = WaterFurnaceCoordinator(hass, client, entry)
entry.runtime_data = coordinator
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
class WaterFurnaceData(threading.Thread):
"""WaterFurnace Data collector.
This is implemented as a dedicated thread polling a websocket in a
tight loop. The websocket will shut itself from the server side if
a packet is not sent at least every 30 seconds. The reading is
cheap, the login is less cheap, so keeping this open and polling
on a very regular cadence is actually the least io intensive thing
to do.
"""
def __init__(self, hass: HomeAssistant, client) -> None:
"""Initialize the data object."""
super().__init__()
self.hass = hass
self.client = client
self.unit = self.client.gwid
self.data = None
self._shutdown = False
self._fails = 0
self.device_metadata = next(
(device for device in client.devices if device.gwid == self.unit), None
)
def _reconnect(self):
"""Reconnect on a failure."""
self._fails += 1
if self._fails > MAX_FAILS:
_LOGGER.error("Failed to refresh login credentials. Thread stopped")
persistent_notification.create(
self.hass,
(
"Error:<br/>Connection to waterfurnace website failed "
"the maximum number of times. Thread has stopped"
),
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
self._shutdown = True
return
# sleep first before the reconnect attempt
_LOGGER.debug("Sleeping for fail # %s", self._fails)
time.sleep(self._fails * ERROR_INTERVAL.total_seconds())
try:
self.client.login()
self.data = self.client.read()
except WFException:
_LOGGER.exception("Failed to reconnect attempt %s", self._fails)
else:
_LOGGER.debug("Reconnected to furnace")
self._fails = 0
def run(self):
"""Thread run loop."""
@callback
def register():
"""Connect to hass for shutdown."""
def shutdown(event):
"""Shutdown the thread."""
_LOGGER.debug("Signaled to shutdown")
self._shutdown = True
self.join()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
self.hass.add_job(register)
# This does a tight loop in sending read calls to the
# websocket. That's a blocking call, which returns pretty
# quickly (1 second). It's important that we do this
# frequently though, because if we don't call the websocket at
# least every 30 seconds the server side closes the
# connection.
while True:
if self._shutdown:
_LOGGER.debug("Graceful shutdown")
return
try:
self.data = self.client.read()
except WFException:
# WFExceptions are things the WF library understands
# that pretty much can all be solved by logging in and
# back out again.
_LOGGER.exception("Failed to read data, attempting to recover")
self._reconnect()
else:
dispatcher_send(self.hass, UPDATE_TOPIC)
time.sleep(SCAN_INTERVAL.total_seconds())

View File

@@ -1,10 +1,8 @@
"""Constants for the WaterFurnace integration."""
from datetime import timedelta
from typing import Final
DOMAIN: Final = "waterfurnace"
INTEGRATION_TITLE: Final = "WaterFurnace"
# Connection settings
MAX_FAILS: Final = 10
UPDATE_INTERVAL: Final = timedelta(seconds=10)

View File

@@ -0,0 +1,56 @@
"""Data update coordinator for WaterFurnace."""
import logging
from typing import TYPE_CHECKING
from waterfurnace.waterfurnace import WaterFurnace, WFException, WFGateway, WFReading
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import UPDATE_INTERVAL
if TYPE_CHECKING:
from . import WaterFurnaceConfigEntry
_LOGGER = logging.getLogger(__name__)
class WaterFurnaceCoordinator(DataUpdateCoordinator[WFReading]):
"""WaterFurnace data update coordinator.
Polls the WaterFurnace API regularly to keep the websocket connection alive.
The server closes the connection if no data is requested for 30 seconds,
so frequent polling is necessary.
"""
device_metadata: WFGateway | None
def __init__(
self,
hass: HomeAssistant,
client: WaterFurnace,
config_entry: WaterFurnaceConfigEntry,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name="WaterFurnace",
update_interval=UPDATE_INTERVAL,
config_entry=config_entry,
)
self.client = client
self.unit = str(client.gwid)
self.device_metadata = None
if client.devices is not None:
self.device_metadata = next(
(device for device in client.devices if device.gwid == self.unit), None
)
async def _async_update_data(self):
"""Fetch data from WaterFurnace API with built-in retry logic."""
try:
return await self.hass.async_add_executor_job(self.client.read_with_retry)
except WFException as err:
raise UpdateFailed(str(err)) from err

View File

@@ -15,13 +15,14 @@ from homeassistant.const import (
UnitOfTemperature,
UnitOfVolumeFlowRate,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import slugify
from . import DOMAIN, UPDATE_TOPIC, WaterFurnaceConfigEntry, WaterFurnaceData
from . import DOMAIN, WaterFurnaceConfigEntry
from .coordinator import WaterFurnaceCoordinator
SENSORS = [
SensorEntityDescription(
@@ -156,62 +157,50 @@ async def async_setup_entry(
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Waterfurnace sensors from a config entry."""
data_collector = WaterFurnaceData(hass, config_entry.runtime_data)
data_collector.start()
coordinator = config_entry.runtime_data
async_add_entities(
WaterFurnaceSensor(data_collector, description) for description in SENSORS
WaterFurnaceSensor(coordinator, description) for description in SENSORS
)
class WaterFurnaceSensor(SensorEntity):
class WaterFurnaceSensor(CoordinatorEntity[WaterFurnaceCoordinator], SensorEntity):
"""Implementing the Waterfurnace sensor."""
entity_description: SensorEntityDescription
_attr_should_poll = False
_attr_has_entity_name = True
def __init__(
self, client: WaterFurnaceData, description: SensorEntityDescription
self, coordinator: WaterFurnaceCoordinator, description: SensorEntityDescription
) -> None:
"""Initialize the sensor."""
self.client = client
super().__init__(coordinator)
self.entity_description = description
# This ensures that the sensors are isolated per waterfurnace unit
self.entity_id = ENTITY_ID_FORMAT.format(
f"wf_{slugify(self.client.unit)}_{slugify(description.key)}"
f"wf_{slugify(coordinator.unit)}_{slugify(description.key)}"
)
self._attr_unique_id = f"{self.client.unit}_{description.key}"
self._attr_unique_id = f"{coordinator.unit}_{description.key}"
device_info = DeviceInfo(
identifiers={(DOMAIN, self.client.unit)},
identifiers={(DOMAIN, coordinator.unit)},
manufacturer="WaterFurnace",
name="WaterFurnace System",
)
if self.client.device_metadata:
if self.client.device_metadata.description:
if coordinator.device_metadata:
if coordinator.device_metadata.description:
# Eg. Series 7
device_info["model"] = self.client.device_metadata.description
if self.client.device_metadata.awlabctypedesc:
device_info["model"] = coordinator.device_metadata.description
if coordinator.device_metadata.awlabctypedesc:
# Eg. Series 7, 5 Ton
device_info["name"] = self.client.device_metadata.awlabctypedesc
device_info["name"] = coordinator.device_metadata.awlabctypedesc
self._attr_device_info = device_info
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
self.async_on_remove(
async_dispatcher_connect(
self.hass, UPDATE_TOPIC, self.async_update_callback
)
)
@callback
def async_update_callback(self):
"""Update state."""
if self.client.data is not None:
self._attr_native_value = getattr(
self.client.data, self.entity_description.key, None
)
self.async_write_ha_state()
@property
def native_value(self):
"""Return the native value of the sensor."""
return getattr(self.coordinator.data, self.entity_description.key, None)

View File

@@ -4,10 +4,11 @@ from collections.abc import Generator
from unittest.mock import AsyncMock, Mock, patch
import pytest
from waterfurnace.waterfurnace import WFReading
from waterfurnace.waterfurnace import WFGateway, WFReading
from homeassistant.components.waterfurnace.const import DOMAIN
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, load_json_object_fixture
@@ -24,19 +25,31 @@ def mock_setup_entry() -> Generator[AsyncMock]:
@pytest.fixture
def mock_waterfurnace_client() -> Generator[Mock]:
"""Mock WaterFurnace client."""
with patch(
"homeassistant.components.waterfurnace.config_flow.WaterFurnace",
autospec=True,
) as mock_client_class:
mock_client = mock_client_class.return_value
with (
patch(
"homeassistant.components.waterfurnace.config_flow.WaterFurnace",
autospec=True,
) as mock_client,
patch(
"homeassistant.components.waterfurnace.WaterFurnace",
new=mock_client,
),
):
client = mock_client.return_value
client.gwid = "TEST_GWID_12345"
mock_client.gwid = "TEST_GWID_12345"
gateway_data = {
"gwid": "TEST_GWID_12345",
"description": "Test WaterFurnace Device",
"awlabctypedesc": "Test ABC Type",
}
client.devices = [WFGateway(gateway_data)]
device_data = WFReading(load_json_object_fixture("device_data.json", DOMAIN))
client.read.return_value = device_data
client.read_with_retry.return_value = device_data
mock_client.read.return_value = device_data
yield mock_client
yield client
@pytest.fixture
@@ -51,3 +64,17 @@ def mock_config_entry() -> MockConfigEntry:
},
unique_id="TEST_GWID_12345",
)
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_waterfurnace_client: Mock,
) -> MockConfigEntry:
"""Set up the WaterFurnace integration for testing."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
return mock_config_entry

View File

@@ -0,0 +1,27 @@
"""Tests for WaterFurnace integration setup."""
from unittest.mock import Mock
from waterfurnace.waterfurnace import WFCredentialError
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def test_setup_auth_failure(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_waterfurnace_client: Mock,
) -> None:
"""Test setup fails with auth error."""
mock_waterfurnace_client.login.side_effect = WFCredentialError(
"Invalid credentials"
)
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR

View File

@@ -0,0 +1,75 @@
"""Test sensor of WaterFurnace integration."""
import asyncio
from unittest.mock import Mock
from freezegun.api import FrozenDateTimeFactory
import pytest
from waterfurnace.waterfurnace import WFException
from homeassistant.components.waterfurnace.const import UPDATE_INTERVAL
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from tests.common import async_fire_time_changed
@pytest.mark.usefixtures("init_integration")
async def test_sensor(
hass: HomeAssistant,
mock_waterfurnace_client: Mock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test states of the sensor."""
state = hass.states.get("sensor.wf_test_gwid_12345_totalunitpower")
assert state
assert state.state == "1500"
mock_waterfurnace_client.read_with_retry.return_value.totalunitpower = 2000
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get("sensor.wf_test_gwid_12345_totalunitpower")
assert state
assert state.state == "2000"
@pytest.mark.usefixtures("init_integration")
@pytest.mark.parametrize(
"side_effect",
[
WFException("Connection failed"),
asyncio.TimeoutError,
],
)
async def test_availability(
hass: HomeAssistant,
mock_waterfurnace_client: Mock,
freezer: FrozenDateTimeFactory,
side_effect: Exception,
) -> None:
"""Ensure that we mark the entities unavailable correctly when service is offline."""
entity_id = "sensor.wf_test_gwid_12345_totalunitpower"
state = hass.states.get(entity_id)
assert state
assert state.state == "1500"
mock_waterfurnace_client.read_with_retry.side_effect = side_effect
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_UNAVAILABLE
mock_waterfurnace_client.read_with_retry.side_effect = None
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == "1500"