1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-15 07:36:16 +00:00

Add actron_air climate integration (#134740)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
Kurt
2025-10-14 03:10:35 +10:00
committed by GitHub
parent f0756af52d
commit 04f83bc067
17 changed files with 906 additions and 0 deletions

2
CODEOWNERS generated
View File

@@ -46,6 +46,8 @@ build.json @home-assistant/supervisor
/tests/components/accuweather/ @bieniu
/homeassistant/components/acmeda/ @atmurray
/tests/components/acmeda/ @atmurray
/homeassistant/components/actron_air/ @kclif9 @JagadishDhanamjayam
/tests/components/actron_air/ @kclif9 @JagadishDhanamjayam
/homeassistant/components/adax/ @danielhiversen @lazytarget
/tests/components/adax/ @danielhiversen @lazytarget
/homeassistant/components/adguard/ @frenck

View File

@@ -0,0 +1,57 @@
"""The Actron Air integration."""
from actron_neo_api import (
ActronAirNeoACSystem,
ActronNeoAPI,
ActronNeoAPIError,
ActronNeoAuthError,
)
from homeassistant.const import CONF_API_TOKEN, Platform
from homeassistant.core import HomeAssistant
from .const import _LOGGER
from .coordinator import (
ActronAirConfigEntry,
ActronAirRuntimeData,
ActronAirSystemCoordinator,
)
PLATFORM = [Platform.CLIMATE]
async def async_setup_entry(hass: HomeAssistant, entry: ActronAirConfigEntry) -> bool:
"""Set up Actron Air integration from a config entry."""
api = ActronNeoAPI(refresh_token=entry.data[CONF_API_TOKEN])
systems: list[ActronAirNeoACSystem] = []
try:
systems = await api.get_ac_systems()
await api.update_status()
except ActronNeoAuthError:
_LOGGER.error("Authentication error while setting up Actron Air integration")
raise
except ActronNeoAPIError as err:
_LOGGER.error("API error while setting up Actron Air integration: %s", err)
raise
system_coordinators: dict[str, ActronAirSystemCoordinator] = {}
for system in systems:
coordinator = ActronAirSystemCoordinator(hass, entry, api, system)
_LOGGER.debug("Setting up coordinator for system: %s", system["serial"])
await coordinator.async_config_entry_first_refresh()
system_coordinators[system["serial"]] = coordinator
entry.runtime_data = ActronAirRuntimeData(
api=api,
system_coordinators=system_coordinators,
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORM)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ActronAirConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORM)

View File

@@ -0,0 +1,259 @@
"""Climate platform for Actron Air integration."""
from typing import Any
from actron_neo_api import ActronAirNeoStatus, ActronAirNeoZone
from homeassistant.components.climate import (
FAN_AUTO,
FAN_HIGH,
FAN_LOW,
FAN_MEDIUM,
ClimateEntity,
ClimateEntityFeature,
HVACMode,
)
from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import ActronAirConfigEntry, ActronAirSystemCoordinator
PARALLEL_UPDATES = 0
FAN_MODE_MAPPING_ACTRONAIR_TO_HA = {
"AUTO": FAN_AUTO,
"LOW": FAN_LOW,
"MED": FAN_MEDIUM,
"HIGH": FAN_HIGH,
}
FAN_MODE_MAPPING_HA_TO_ACTRONAIR = {
v: k for k, v in FAN_MODE_MAPPING_ACTRONAIR_TO_HA.items()
}
HVAC_MODE_MAPPING_ACTRONAIR_TO_HA = {
"COOL": HVACMode.COOL,
"HEAT": HVACMode.HEAT,
"FAN": HVACMode.FAN_ONLY,
"AUTO": HVACMode.AUTO,
"OFF": HVACMode.OFF,
}
HVAC_MODE_MAPPING_HA_TO_ACTRONAIR = {
v: k for k, v in HVAC_MODE_MAPPING_ACTRONAIR_TO_HA.items()
}
async def async_setup_entry(
hass: HomeAssistant,
entry: ActronAirConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Actron Air climate entities."""
system_coordinators = entry.runtime_data.system_coordinators
entities: list[ClimateEntity] = []
for coordinator in system_coordinators.values():
status = coordinator.data
name = status.ac_system.system_name
entities.append(ActronSystemClimate(coordinator, name))
entities.extend(
ActronZoneClimate(coordinator, zone)
for zone in status.remote_zone_info
if zone.exists
)
async_add_entities(entities)
class BaseClimateEntity(CoordinatorEntity[ActronAirSystemCoordinator], ClimateEntity):
"""Base class for Actron Air climate entities."""
_attr_has_entity_name = True
_attr_temperature_unit = UnitOfTemperature.CELSIUS
_attr_supported_features = (
ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.FAN_MODE
| ClimateEntityFeature.TURN_ON
| ClimateEntityFeature.TURN_OFF
)
_attr_name = None
_attr_fan_modes = list(FAN_MODE_MAPPING_ACTRONAIR_TO_HA.values())
_attr_hvac_modes = list(HVAC_MODE_MAPPING_ACTRONAIR_TO_HA.values())
def __init__(
self,
coordinator: ActronAirSystemCoordinator,
name: str,
) -> None:
"""Initialize an Actron Air unit."""
super().__init__(coordinator)
self._serial_number = coordinator.serial_number
class ActronSystemClimate(BaseClimateEntity):
"""Representation of the Actron Air system."""
_attr_supported_features = (
ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.FAN_MODE
| ClimateEntityFeature.TURN_ON
| ClimateEntityFeature.TURN_OFF
)
def __init__(
self,
coordinator: ActronAirSystemCoordinator,
name: str,
) -> None:
"""Initialize an Actron Air unit."""
super().__init__(coordinator, name)
serial_number = coordinator.serial_number
self._attr_unique_id = serial_number
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, serial_number)},
name=self._status.ac_system.system_name,
manufacturer="Actron Air",
model_id=self._status.ac_system.master_wc_model,
sw_version=self._status.ac_system.master_wc_firmware_version,
serial_number=serial_number,
)
@property
def min_temp(self) -> float:
"""Return the minimum temperature that can be set."""
return self._status.min_temp
@property
def max_temp(self) -> float:
"""Return the maximum temperature that can be set."""
return self._status.max_temp
@property
def _status(self) -> ActronAirNeoStatus:
"""Get the current status from the coordinator."""
return self.coordinator.data
@property
def hvac_mode(self) -> HVACMode | None:
"""Return the current HVAC mode."""
if not self._status.user_aircon_settings.is_on:
return HVACMode.OFF
mode = self._status.user_aircon_settings.mode
return HVAC_MODE_MAPPING_ACTRONAIR_TO_HA.get(mode)
@property
def fan_mode(self) -> str | None:
"""Return the current fan mode."""
fan_mode = self._status.user_aircon_settings.fan_mode
return FAN_MODE_MAPPING_ACTRONAIR_TO_HA.get(fan_mode)
@property
def current_humidity(self) -> float:
"""Return the current humidity."""
return self._status.master_info.live_humidity_pc
@property
def current_temperature(self) -> float:
"""Return the current temperature."""
return self._status.master_info.live_temp_c
@property
def target_temperature(self) -> float:
"""Return the target temperature."""
return self._status.user_aircon_settings.temperature_setpoint_cool_c
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set a new fan mode."""
api_fan_mode = FAN_MODE_MAPPING_HA_TO_ACTRONAIR.get(fan_mode.lower())
await self._status.user_aircon_settings.set_fan_mode(api_fan_mode)
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set the HVAC mode."""
ac_mode = HVAC_MODE_MAPPING_HA_TO_ACTRONAIR.get(hvac_mode)
await self._status.ac_system.set_system_mode(ac_mode)
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set the temperature."""
temp = kwargs.get(ATTR_TEMPERATURE)
await self._status.user_aircon_settings.set_temperature(temperature=temp)
class ActronZoneClimate(BaseClimateEntity):
"""Representation of a zone within the Actron Air system."""
_attr_supported_features = (
ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.TURN_ON
| ClimateEntityFeature.TURN_OFF
)
def __init__(
self,
coordinator: ActronAirSystemCoordinator,
zone: ActronAirNeoZone,
) -> None:
"""Initialize an Actron Air unit."""
super().__init__(coordinator, zone.title)
serial_number = coordinator.serial_number
self._zone_id: int = zone.zone_id
self._attr_unique_id: str = f"{serial_number}_zone_{zone.zone_id}"
self._attr_device_info: DeviceInfo = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
name=zone.title,
manufacturer="Actron Air",
model="Zone",
suggested_area=zone.title,
via_device=(DOMAIN, serial_number),
)
@property
def min_temp(self) -> float:
"""Return the minimum temperature that can be set."""
return self._zone.min_temp
@property
def max_temp(self) -> float:
"""Return the maximum temperature that can be set."""
return self._zone.max_temp
@property
def _zone(self) -> ActronAirNeoZone:
"""Get the current zone data from the coordinator."""
status = self.coordinator.data
return status.zones[self._zone_id]
@property
def hvac_mode(self) -> HVACMode | None:
"""Return the current HVAC mode."""
if self._zone.is_active:
mode = self._zone.hvac_mode
return HVAC_MODE_MAPPING_ACTRONAIR_TO_HA.get(mode)
return HVACMode.OFF
@property
def current_humidity(self) -> float | None:
"""Return the current humidity."""
return self._zone.humidity
@property
def current_temperature(self) -> float | None:
"""Return the current temperature."""
return self._zone.live_temp_c
@property
def target_temperature(self) -> float | None:
"""Return the target temperature."""
return self._zone.temperature_setpoint_cool_c
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set the HVAC mode."""
is_enabled = hvac_mode != HVACMode.OFF
await self._zone.enable(is_enabled)
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set the temperature."""
await self._zone.set_temperature(temperature=kwargs["temperature"])

View File

@@ -0,0 +1,132 @@
"""Setup config flow for Actron Air integration."""
import asyncio
from typing import Any
from actron_neo_api import ActronNeoAPI, ActronNeoAuthError
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_API_TOKEN
from homeassistant.exceptions import HomeAssistantError
from .const import _LOGGER, DOMAIN
class ActronAirConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Actron Air."""
def __init__(self) -> None:
"""Initialize the config flow."""
self._api: ActronNeoAPI | None = None
self._device_code: str | None = None
self._user_code: str = ""
self._verification_uri: str = ""
self._expires_minutes: str = "30"
self.login_task: asyncio.Task | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
if self._api is None:
_LOGGER.debug("Initiating device authorization")
self._api = ActronNeoAPI()
try:
device_code_response = await self._api.request_device_code()
except ActronNeoAuthError as err:
_LOGGER.error("OAuth2 flow failed: %s", err)
return self.async_abort(reason="oauth2_error")
self._device_code = device_code_response["device_code"]
self._user_code = device_code_response["user_code"]
self._verification_uri = device_code_response["verification_uri_complete"]
self._expires_minutes = str(device_code_response["expires_in"] // 60)
async def _wait_for_authorization() -> None:
"""Wait for the user to authorize the device."""
assert self._api is not None
assert self._device_code is not None
_LOGGER.debug("Waiting for device authorization")
try:
await self._api.poll_for_token(self._device_code)
_LOGGER.debug("Authorization successful")
except ActronNeoAuthError as ex:
_LOGGER.exception("Error while waiting for device authorization")
raise CannotConnect from ex
_LOGGER.debug("Checking login task")
if self.login_task is None:
_LOGGER.debug("Creating task for device authorization")
self.login_task = self.hass.async_create_task(_wait_for_authorization())
if self.login_task.done():
_LOGGER.debug("Login task is done, checking results")
if exception := self.login_task.exception():
if isinstance(exception, CannotConnect):
return self.async_show_progress_done(
next_step_id="connection_error"
)
return self.async_show_progress_done(next_step_id="timeout")
return self.async_show_progress_done(next_step_id="finish_login")
return self.async_show_progress(
step_id="user",
progress_action="wait_for_authorization",
description_placeholders={
"user_code": self._user_code,
"verification_uri": self._verification_uri,
"expires_minutes": self._expires_minutes,
},
progress_task=self.login_task,
)
async def async_step_finish_login(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the finalization of login."""
_LOGGER.debug("Finalizing authorization")
assert self._api is not None
try:
user_data = await self._api.get_user_info()
except ActronNeoAuthError as err:
_LOGGER.error("Error getting user info: %s", err)
return self.async_abort(reason="oauth2_error")
unique_id = str(user_data["id"])
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_data["email"],
data={CONF_API_TOKEN: self._api.refresh_token_value},
)
async def async_step_timeout(
self,
user_input: dict[str, Any] | None = None,
) -> ConfigFlowResult:
"""Handle issues that need transition await from progress step."""
if user_input is None:
return self.async_show_form(
step_id="timeout",
)
del self.login_task
return await self.async_step_user()
async def async_step_connection_error(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle connection error from progress step."""
if user_input is None:
return self.async_show_form(step_id="connection_error")
# Reset state and try again
self._api = None
self._device_code = None
self.login_task = None
return await self.async_step_user()
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect."""

View File

@@ -0,0 +1,6 @@
"""Constants used by Actron Air integration."""
import logging
_LOGGER = logging.getLogger(__package__)
DOMAIN = "actron_air"

View File

@@ -0,0 +1,69 @@
"""Coordinator for Actron Air integration."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from actron_neo_api import ActronAirNeoACSystem, ActronAirNeoStatus, ActronNeoAPI
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util import dt as dt_util
from .const import _LOGGER
STALE_DEVICE_TIMEOUT = timedelta(hours=24)
ERROR_NO_SYSTEMS_FOUND = "no_systems_found"
ERROR_UNKNOWN = "unknown_error"
@dataclass
class ActronAirRuntimeData:
"""Runtime data for the Actron Air integration."""
api: ActronNeoAPI
system_coordinators: dict[str, ActronAirSystemCoordinator]
type ActronAirConfigEntry = ConfigEntry[ActronAirRuntimeData]
AUTH_ERROR_THRESHOLD = 3
SCAN_INTERVAL = timedelta(seconds=30)
class ActronAirSystemCoordinator(DataUpdateCoordinator[ActronAirNeoACSystem]):
"""System coordinator for Actron Air integration."""
def __init__(
self,
hass: HomeAssistant,
entry: ActronAirConfigEntry,
api: ActronNeoAPI,
system: ActronAirNeoACSystem,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name="Actron Air Status",
update_interval=SCAN_INTERVAL,
config_entry=entry,
)
self.system = system
self.serial_number = system["serial"]
self.api = api
self.status = self.api.state_manager.get_status(self.serial_number)
self.last_seen = dt_util.utcnow()
async def _async_update_data(self) -> ActronAirNeoStatus:
"""Fetch updates and merge incremental changes into the full state."""
await self.api.update_status()
self.status = self.api.state_manager.get_status(self.serial_number)
self.last_seen = dt_util.utcnow()
return self.status
def is_device_stale(self) -> bool:
"""Check if a device is stale (not seen for a while)."""
return (dt_util.utcnow() - self.last_seen) > STALE_DEVICE_TIMEOUT

View File

@@ -0,0 +1,16 @@
{
"domain": "actron_air",
"name": "Actron Air",
"codeowners": ["@kclif9", "@JagadishDhanamjayam"],
"config_flow": true,
"dhcp": [
{
"hostname": "neo-*",
"macaddress": "FC0FE7*"
}
],
"documentation": "https://www.home-assistant.io/integrations/actron_air",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["actron-neo-api==0.1.84"]
}

View File

@@ -0,0 +1,78 @@
rules:
# Bronze
action-setup:
status: exempt
comment: This integration does not have custom service actions.
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: This integration does not have custom service actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: This integration does not subscribe to external events.
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions: todo
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: No options flow
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: done
parallel-updates: done
reauthentication-flow: todo
test-coverage: todo
# Gold
devices: done
diagnostics: todo
discovery-update-info:
status: exempt
comment: This integration uses DHCP discovery, however is cloud polling. Therefore there is no information to update.
discovery: done
docs-data-update: done
docs-examples: done
docs-known-limitations: done
docs-supported-devices: done
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: done
dynamic-devices: todo
entity-category:
status: exempt
comment: This integration does not use entity categories.
entity-device-class:
status: exempt
comment: This integration does not use entity device classes.
entity-disabled-by-default:
status: exempt
comment: Not required for this integration at this stage.
entity-translations: todo
exception-translations: todo
icon-translations: todo
reconfiguration-flow: todo
repair-issues:
status: exempt
comment: This integration does not have any known issues that require repair.
stale-devices: todo
# Platinum
async-dependency: done
inject-websession: todo
strict-typing: todo

View File

@@ -0,0 +1,29 @@
{
"config": {
"step": {
"user": {
"title": "Actron Air OAuth2 Authorization"
},
"timeout": {
"title": "Authorization timeout",
"description": "The authorization process timed out. Please try again.",
"data": {}
},
"connection_error": {
"title": "Connection error",
"description": "Failed to connect to Actron Air. Please check your internet connection and try again.",
"data": {}
}
},
"progress": {
"wait_for_authorization": "To authenticate, open the following URL and login at Actron Air:\n{verification_uri}\nIf the code is not automatically copied, paste the following code to authorize the integration:\n\n```{user_code}```\n\n\nThe login attempt will time out after {expires_minutes} minutes."
},
"error": {
"oauth2_error": "Failed to start OAuth2 flow. Please try again later."
},
"abort": {
"oauth2_error": "Failed to start OAuth2 flow",
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
}
}
}

View File

@@ -28,6 +28,7 @@ FLOWS = {
"acaia",
"accuweather",
"acmeda",
"actron_air",
"adax",
"adguard",
"advantage_air",

View File

@@ -8,6 +8,11 @@ from __future__ import annotations
from typing import Final
DHCP: Final[list[dict[str, str | bool]]] = [
{
"domain": "actron_air",
"hostname": "neo-*",
"macaddress": "FC0FE7*",
},
{
"domain": "airthings",
"hostname": "airthings-view",

View File

@@ -47,6 +47,12 @@
"config_flow": false,
"iot_class": "local_polling"
},
"actron_air": {
"name": "Actron Air",
"integration_type": "hub",
"config_flow": true,
"iot_class": "cloud_polling"
},
"adax": {
"name": "Adax",
"integration_type": "hub",

3
requirements_all.txt generated
View File

@@ -133,6 +133,9 @@ WSDiscovery==2.1.2
# homeassistant.components.accuweather
accuweather==4.2.2
# homeassistant.components.actron_air
actron-neo-api==0.1.84
# homeassistant.components.adax
adax==0.4.0

View File

@@ -121,6 +121,9 @@ WSDiscovery==2.1.2
# homeassistant.components.accuweather
accuweather==4.2.2
# homeassistant.components.actron_air
actron-neo-api==0.1.84
# homeassistant.components.adax
adax==0.4.0

View File

@@ -0,0 +1 @@
"""Tests for the Actron Air integration."""

View File

@@ -0,0 +1,55 @@
"""Test fixtures for the Actron Air Integration."""
import asyncio
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture
def mock_actron_api() -> Generator[AsyncMock]:
"""Mock the Actron Air API class."""
with (
patch(
"homeassistant.components.actron_air.ActronNeoAPI",
autospec=True,
) as mock_api,
patch(
"homeassistant.components.actron_air.config_flow.ActronNeoAPI",
new=mock_api,
),
):
api = mock_api.return_value
# Mock device code request
api.request_device_code.return_value = {
"device_code": "test_device_code",
"user_code": "ABC123",
"verification_uri_complete": "https://example.com/device",
"expires_in": 1800,
}
# Mock successful token polling (with a small delay to test progress)
async def slow_poll_for_token(device_code):
await asyncio.sleep(0.1) # Small delay to allow progress state to be tested
return {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
}
api.poll_for_token = slow_poll_for_token
# Mock user info
api.get_user_info = AsyncMock(
return_value={"id": "test_user_id", "email": "test@example.com"}
)
# Mock refresh token property
api.refresh_token_value = "test_refresh_token"
# Mock other API methods that might be used
api.get_systems = AsyncMock(return_value=[])
api.get_status = AsyncMock(return_value=None)
yield api

View File

@@ -0,0 +1,184 @@
"""Config flow tests for the Actron Air Integration."""
import asyncio
from unittest.mock import AsyncMock
from actron_neo_api import ActronNeoAuthError
from homeassistant import config_entries
from homeassistant.components.actron_air.const import DOMAIN
from homeassistant.const import CONF_API_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
async def test_user_flow_oauth2_success(
hass: HomeAssistant, mock_actron_api: AsyncMock
) -> None:
"""Test successful OAuth2 device code flow."""
# Start the config flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Should start with a progress step
assert result["type"] is FlowResultType.SHOW_PROGRESS
assert result["step_id"] == "user"
assert result["progress_action"] == "wait_for_authorization"
assert result["description_placeholders"] is not None
assert "user_code" in result["description_placeholders"]
assert result["description_placeholders"]["user_code"] == "ABC123"
# Wait for the progress to complete
await hass.async_block_till_done()
# Continue the flow after progress is done - this should complete the entire flow
result = await hass.config_entries.flow.async_configure(result["flow_id"])
# Should create entry on successful token exchange
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test@example.com"
assert result["data"] == {
CONF_API_TOKEN: "test_refresh_token",
}
assert result["result"].unique_id == "test_user_id"
async def test_user_flow_oauth2_pending(hass: HomeAssistant, mock_actron_api) -> None:
"""Test OAuth2 flow when authorization is still pending."""
# Make poll_for_token hang indefinitely to simulate pending state
async def hang_forever(device_code):
await asyncio.Event().wait() # This will never complete
mock_actron_api.poll_for_token = hang_forever
# Start the config flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Should start with a progress step since the task will never complete
assert result["type"] is FlowResultType.SHOW_PROGRESS
assert result["step_id"] == "user"
assert result["progress_action"] == "wait_for_authorization"
assert result["description_placeholders"] is not None
assert "user_code" in result["description_placeholders"]
assert result["description_placeholders"]["user_code"] == "ABC123"
# The background task should be running but not completed
# In a real scenario, the user would wait for authorization on their device
async def test_user_flow_oauth2_error(hass: HomeAssistant, mock_actron_api) -> None:
"""Test OAuth2 flow with authentication error during device code request."""
# Override the default mock to raise an error
mock_actron_api.request_device_code = AsyncMock(
side_effect=ActronNeoAuthError("OAuth2 error")
)
# Start the flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Should abort with oauth2_error
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "oauth2_error"
async def test_user_flow_token_polling_error(
hass: HomeAssistant, mock_actron_api
) -> None:
"""Test OAuth2 flow with error during token polling."""
# Override the default mock to raise an error during token polling
mock_actron_api.poll_for_token = AsyncMock(
side_effect=ActronNeoAuthError("Token polling error")
)
# Start the config flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Since the error occurs immediately, the task completes and we get progress_done
assert result["type"] is FlowResultType.SHOW_PROGRESS_DONE
assert result["step_id"] == "connection_error"
# Continue to the connection_error step
result = await hass.config_entries.flow.async_configure(result["flow_id"])
# Should show the connection error form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "connection_error"
# Now fix the mock to allow successful token polling for recovery
async def successful_poll_for_token(device_code):
await asyncio.sleep(0.1) # Small delay to allow progress state
return {
"access_token": "test_access_token",
"refresh_token": "test_refresh_token",
}
mock_actron_api.poll_for_token = successful_poll_for_token
# User clicks retry button - this should restart the flow and succeed
result = await hass.config_entries.flow.async_configure(result["flow_id"], {})
# Should start progress again
assert result["type"] is FlowResultType.SHOW_PROGRESS
assert result["step_id"] == "user"
assert result["progress_action"] == "wait_for_authorization"
# Wait for the progress to complete
await hass.async_block_till_done()
# Continue the flow after progress is done - this should complete successfully
result = await hass.config_entries.flow.async_configure(result["flow_id"])
# Should create entry on successful recovery
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test@example.com"
assert result["data"] == {
CONF_API_TOKEN: "test_refresh_token",
}
assert result["result"].unique_id == "test_user_id"
async def test_user_flow_duplicate_account(
hass: HomeAssistant, mock_actron_api: AsyncMock
) -> None:
"""Test duplicate account handling - should abort when same account is already configured."""
# Create an existing config entry for the same user account
existing_entry = MockConfigEntry(
domain=DOMAIN,
title="test@example.com",
data={CONF_API_TOKEN: "existing_refresh_token"},
unique_id="test_user_id",
)
existing_entry.add_to_hass(hass)
# Start the config flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Should start with a progress step
assert result["type"] is FlowResultType.SHOW_PROGRESS
assert result["step_id"] == "user"
assert result["progress_action"] == "wait_for_authorization"
assert result["description_placeholders"] is not None
assert "user_code" in result["description_placeholders"]
assert result["description_placeholders"]["user_code"] == "ABC123"
# Wait for the progress to complete
await hass.async_block_till_done()
# Continue the flow after progress is done
result = await hass.config_entries.flow.async_configure(result["flow_id"])
# Should abort because the account is already configured
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "already_configured"