1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 21:06:19 +00:00

Cache token info in Wallbox (#154147)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
Hessel
2025-11-20 11:12:11 +01:00
committed by GitHub
parent 12c04f5571
commit 25e2c9ee80
7 changed files with 158 additions and 82 deletions

View File

@@ -6,15 +6,15 @@ from wallbox import Wallbox
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from .const import UPDATE_INTERVAL
from .coordinator import (
InvalidAuth,
WallboxConfigEntry,
WallboxCoordinator,
async_validate_input,
from .const import (
CHARGER_JWT_REFRESH_TOKEN,
CHARGER_JWT_REFRESH_TTL,
CHARGER_JWT_TOKEN,
CHARGER_JWT_TTL,
UPDATE_INTERVAL,
)
from .coordinator import WallboxConfigEntry, WallboxCoordinator, check_token_validity
PLATFORMS = [
Platform.LOCK,
@@ -32,10 +32,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: WallboxConfigEntry) -> b
entry.data[CONF_PASSWORD],
jwtTokenDrift=UPDATE_INTERVAL,
)
try:
await async_validate_input(hass, wallbox)
except InvalidAuth as ex:
raise ConfigEntryAuthFailed from ex
if CHARGER_JWT_TOKEN in entry.data and check_token_validity(
jwt_token_ttl=entry.data.get(CHARGER_JWT_TTL, 0),
jwt_token_drift=UPDATE_INTERVAL,
):
wallbox.jwtToken = entry.data.get(CHARGER_JWT_TOKEN)
wallbox.jwtRefreshToken = entry.data.get(CHARGER_JWT_REFRESH_TOKEN)
wallbox.jwtTokenTtl = entry.data.get(CHARGER_JWT_TTL)
wallbox.jwtRefreshTokenTtl = entry.data.get(CHARGER_JWT_REFRESH_TTL)
wallbox.headers["Authorization"] = f"Bearer {entry.data.get(CHARGER_JWT_TOKEN)}"
wallbox_coordinator = WallboxCoordinator(hass, entry, wallbox)
await wallbox_coordinator.async_config_entry_first_refresh()

View File

@@ -12,7 +12,15 @@ from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlow, ConfigFlowRe
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from .const import CONF_STATION, DOMAIN
from .const import (
CHARGER_JWT_REFRESH_TOKEN,
CHARGER_JWT_REFRESH_TTL,
CHARGER_JWT_TOKEN,
CHARGER_JWT_TTL,
CONF_STATION,
DOMAIN,
UPDATE_INTERVAL,
)
from .coordinator import InvalidAuth, async_validate_input
COMPONENT_DOMAIN = DOMAIN
@@ -26,17 +34,22 @@ STEP_USER_DATA_SCHEMA = vol.Schema(
)
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, str]:
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows to connect.
Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user.
"""
wallbox = Wallbox(data["username"], data["password"])
wallbox = Wallbox(data[CONF_USERNAME], data[CONF_PASSWORD], UPDATE_INTERVAL)
await async_validate_input(hass, wallbox)
data[CHARGER_JWT_TOKEN] = wallbox.jwtToken
data[CHARGER_JWT_REFRESH_TOKEN] = wallbox.jwtRefreshToken
data[CHARGER_JWT_TTL] = wallbox.jwtTokenTtl
data[CHARGER_JWT_REFRESH_TTL] = wallbox.jwtRefreshTokenTtl
# Return info that you want to store in the config entry.
return {"title": "Wallbox Portal"}
return {"title": "Wallbox Portal", "data": data}
class WallboxConfigFlow(ConfigFlow, domain=COMPONENT_DOMAIN):
@@ -64,8 +77,11 @@ class WallboxConfigFlow(ConfigFlow, domain=COMPONENT_DOMAIN):
await self.async_set_unique_id(user_input["station"])
if self.source != SOURCE_REAUTH:
self._abort_if_unique_id_configured()
info = await validate_input(self.hass, user_input)
return self.async_create_entry(title=info["title"], data=user_input)
validation_data = await validate_input(self.hass, user_input)
return self.async_create_entry(
title=validation_data["title"],
data=validation_data["data"],
)
reauth_entry = self._get_reauth_entry()
if user_input["station"] == reauth_entry.data[CONF_STATION]:
return self.async_update_reload_and_abort(reauth_entry, data=user_input)

View File

@@ -47,6 +47,12 @@ CHARGER_CONNECTIONS = "connections"
CHARGER_ECO_SMART_KEY = "ecosmart"
CHARGER_ECO_SMART_STATUS_KEY = "enabled"
CHARGER_ECO_SMART_MODE_KEY = "mode"
CHARGER_WALLBOX_OBJECT_KEY = "wallbox"
CHARGER_JWT_TOKEN = "jwtToken"
CHARGER_JWT_REFRESH_TOKEN = "jwtRefreshToken"
CHARGER_JWT_TTL = "jwtTokenTtl"
CHARGER_JWT_REFRESH_TTL = "jwtRefreshTokenTtl"
class ChargerStatus(StrEnum):

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
from datetime import datetime, timedelta
from http import HTTPStatus
import logging
from typing import Any, Concatenate
@@ -27,6 +27,10 @@ from .const import (
CHARGER_ECO_SMART_STATUS_KEY,
CHARGER_ENERGY_PRICE_KEY,
CHARGER_FEATURES_KEY,
CHARGER_JWT_REFRESH_TOKEN,
CHARGER_JWT_REFRESH_TTL,
CHARGER_JWT_TOKEN,
CHARGER_JWT_TTL,
CHARGER_LOCKED_UNLOCKED_KEY,
CHARGER_MAX_CHARGING_CURRENT_KEY,
CHARGER_MAX_CHARGING_CURRENT_POST_KEY,
@@ -86,27 +90,25 @@ def _require_authentication[_WallboxCoordinatorT: WallboxCoordinator, **_P](
) -> Callable[Concatenate[_WallboxCoordinatorT, _P], Any]:
"""Authenticate with decorator using Wallbox API."""
def require_authentication(
async def require_authentication(
self: _WallboxCoordinatorT, *args: _P.args, **kwargs: _P.kwargs
) -> Any:
"""Authenticate using Wallbox API."""
try:
self.authenticate()
return func(self, *args, **kwargs)
except requests.exceptions.HTTPError as wallbox_connection_error:
if wallbox_connection_error.response.status_code == HTTPStatus.FORBIDDEN:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN, translation_key="invalid_auth"
) from wallbox_connection_error
raise HomeAssistantError(
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
await self.async_authenticate()
return await func(self, *args, **kwargs)
return require_authentication
def check_token_validity(jwt_token_ttl: int, jwt_token_drift: int) -> bool:
"""Check if the jwtToken is still valid in order to reuse if possible."""
return round((jwt_token_ttl / 1000) - jwt_token_drift, 0) > datetime.timestamp(
datetime.now()
)
def _validate(wallbox: Wallbox) -> None:
"""Authenticate using Wallbox API."""
"""Authenticate using Wallbox API to check if the used credentials are valid."""
try:
wallbox.authenticate()
except requests.exceptions.HTTPError as wallbox_connection_error:
@@ -142,11 +144,38 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
update_interval=timedelta(seconds=UPDATE_INTERVAL),
)
def authenticate(self) -> None:
"""Authenticate using Wallbox API."""
self._wallbox.authenticate()
def _authenticate(self) -> dict[str, str]:
"""Authenticate using Wallbox API. First check token validity."""
data = dict(self.config_entry.data)
if not check_token_validity(
jwt_token_ttl=data.get(CHARGER_JWT_TTL, 0),
jwt_token_drift=UPDATE_INTERVAL,
):
try:
self._wallbox.authenticate()
except requests.exceptions.HTTPError as wallbox_connection_error:
if (
wallbox_connection_error.response.status_code
== HTTPStatus.FORBIDDEN
):
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN, translation_key="invalid_auth"
) from wallbox_connection_error
raise HomeAssistantError(
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
else:
data[CHARGER_JWT_TOKEN] = self._wallbox.jwtToken
data[CHARGER_JWT_REFRESH_TOKEN] = self._wallbox.jwtRefreshToken
data[CHARGER_JWT_TTL] = self._wallbox.jwtTokenTtl
data[CHARGER_JWT_REFRESH_TTL] = self._wallbox.jwtRefreshTokenTtl
return data
async def async_authenticate(self) -> None:
"""Authenticate using Wallbox API."""
data = await self.hass.async_add_executor_job(self._authenticate)
self.hass.config_entries.async_update_entry(self.config_entry, data=data)
@_require_authentication
def _get_data(self) -> dict[str, Any]:
"""Get new sensor data for Wallbox component."""
try:
@@ -208,6 +237,7 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def _async_update_data(self) -> dict[str, Any]:
"""Get new sensor data for Wallbox component. Set update interval to be UPDATE_INTERVAL * #wallbox chargers configured, this is necessary due to rate limitations."""
@@ -217,7 +247,6 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
return await self.hass.async_add_executor_job(self._get_data)
@_require_authentication
def _set_charging_current(
self, charging_current: float
) -> dict[str, dict[str, dict[str, Any]]]:
@@ -246,6 +275,7 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_set_charging_current(self, charging_current: float) -> None:
"""Set maximum charging current for Wallbox."""
data = await self.hass.async_add_executor_job(
@@ -253,7 +283,6 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
self.async_set_updated_data(data)
@_require_authentication
def _set_icp_current(self, icp_current: float) -> dict[str, Any]:
"""Set maximum icp current for Wallbox."""
try:
@@ -276,6 +305,7 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_set_icp_current(self, icp_current: float) -> None:
"""Set maximum icp current for Wallbox."""
data = await self.hass.async_add_executor_job(
@@ -283,7 +313,6 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
self.async_set_updated_data(data)
@_require_authentication
def _set_energy_cost(self, energy_cost: float) -> dict[str, Any]:
"""Set energy cost for Wallbox."""
try:
@@ -300,6 +329,7 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_set_energy_cost(self, energy_cost: float) -> None:
"""Set energy cost for Wallbox."""
data = await self.hass.async_add_executor_job(
@@ -307,7 +337,6 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
self.async_set_updated_data(data)
@_require_authentication
def _set_lock_unlock(self, lock: bool) -> dict[str, dict[str, dict[str, Any]]]:
"""Set wallbox to locked or unlocked."""
try:
@@ -335,12 +364,12 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_set_lock_unlock(self, lock: bool) -> None:
"""Set wallbox to locked or unlocked."""
data = await self.hass.async_add_executor_job(self._set_lock_unlock, lock)
self.async_set_updated_data(data)
@_require_authentication
def _pause_charger(self, pause: bool) -> None:
"""Set wallbox to pause or resume."""
try:
@@ -357,12 +386,12 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_pause_charger(self, pause: bool) -> None:
"""Set wallbox to pause or resume."""
await self.hass.async_add_executor_job(self._pause_charger, pause)
await self.async_request_refresh()
@_require_authentication
def _set_eco_smart(self, option: str) -> None:
"""Set wallbox solar charging mode."""
try:
@@ -381,6 +410,7 @@ class WallboxCoordinator(DataUpdateCoordinator[dict[str, Any]]):
translation_domain=DOMAIN, translation_key="api_failed"
) from wallbox_connection_error
@_require_authentication
async def async_set_eco_smart(self, option: str) -> None:
"""Set wallbox solar charging mode."""

View File

@@ -1,5 +1,6 @@
"""Test fixtures for the Wallbox integration."""
from datetime import datetime, timedelta
from http import HTTPStatus
from unittest.mock import MagicMock, Mock, patch
@@ -10,6 +11,10 @@ from homeassistant.components.wallbox.const import (
CHARGER_DATA_POST_L1_KEY,
CHARGER_DATA_POST_L2_KEY,
CHARGER_ENERGY_PRICE_KEY,
CHARGER_JWT_REFRESH_TOKEN,
CHARGER_JWT_REFRESH_TTL,
CHARGER_JWT_TOKEN,
CHARGER_JWT_TTL,
CHARGER_LOCKED_UNLOCKED_KEY,
CHARGER_MAX_CHARGING_CURRENT_POST_KEY,
CHARGER_MAX_ICP_CURRENT_KEY,
@@ -43,6 +48,14 @@ def entry(hass: HomeAssistant) -> MockConfigEntry:
CONF_USERNAME: "test_username",
CONF_PASSWORD: "test_password",
CONF_STATION: "12345",
CHARGER_JWT_TOKEN: "test_token",
CHARGER_JWT_REFRESH_TOKEN: "test_refresh_token",
CHARGER_JWT_TTL: (
datetime.timestamp(datetime.now() + timedelta(hours=1)) * 1000
),
CHARGER_JWT_REFRESH_TTL: (
datetime.timestamp(datetime.now() + timedelta(hours=1)) * 1000
),
},
entry_id="testEntry",
)
@@ -82,6 +95,14 @@ def mock_wallbox():
)
wallbox.setIcpMaxCurrent = Mock(return_value={CHARGER_MAX_ICP_CURRENT_KEY: 25})
wallbox.getChargerStatus = Mock(return_value=WALLBOX_STATUS_RESPONSE)
wallbox.jwtToken = "test_token"
wallbox.jwtRefreshToken = "test_refresh_token"
wallbox.jwtTokenTtl = (
datetime.timestamp(datetime.now() + timedelta(hours=1)) * 1000
)
wallbox.jwtRefreshTokenTtl = (
datetime.timestamp(datetime.now() + timedelta(hours=1)) * 1000
)
mock.return_value = wallbox
yield wallbox

View File

@@ -9,19 +9,20 @@ from homeassistant.components.wallbox.const import (
CHARGER_CHARGING_POWER_KEY,
CHARGER_CHARGING_SPEED_KEY,
CHARGER_DATA_KEY,
CHARGER_JWT_REFRESH_TOKEN,
CHARGER_JWT_TOKEN,
CHARGER_MAX_AVAILABLE_POWER_KEY,
CHARGER_MAX_CHARGING_CURRENT_KEY,
CONF_STATION,
DOMAIN,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import http_403_error, http_404_error, setup_integration
from .const import (
WALLBOX_AUTHORISATION_RESPONSE,
WALLBOX_AUTHORISATION_RESPONSE_UNAUTHORISED,
)
from .const import WALLBOX_AUTHORISATION_RESPONSE_UNAUTHORISED
from tests.common import MockConfigEntry
@@ -62,9 +63,9 @@ async def test_form_cannot_authenticate(hass: HomeAssistant) -> None:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"station": "12345",
"username": "test-username",
"password": "test-password",
CONF_STATION: "12345",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
},
)
@@ -90,9 +91,9 @@ async def test_form_cannot_connect(hass: HomeAssistant) -> None:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"station": "12345",
"username": "test-username",
"password": "test-password",
CONF_STATION: "12345",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
},
)
@@ -100,32 +101,33 @@ async def test_form_cannot_connect(hass: HomeAssistant) -> None:
assert result2["errors"] == {"base": "cannot_connect"}
async def test_form_validate_input(hass: HomeAssistant) -> None:
async def test_form_validate_input(
hass: HomeAssistant, entry: MockConfigEntry, mock_wallbox
) -> None:
"""Test we can validate input."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with (
patch(
"homeassistant.components.wallbox.Wallbox.authenticate",
return_value=WALLBOX_AUTHORISATION_RESPONSE,
),
patch(
"homeassistant.components.wallbox.Wallbox.pauseChargingSession",
return_value=test_response,
"homeassistant.components.wallbox.config_flow.Wallbox",
return_value=mock_wallbox,
),
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"station": "12345",
"username": "test-username",
"password": "test-password",
CONF_STATION: "12345",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
},
)
assert result2["title"] == "Wallbox Portal"
assert result2["data"]["station"] == "12345"
assert result2["data"][CONF_STATION] == "12345"
assert result2["data"][CONF_USERNAME] == "test-username"
assert result2["data"][CHARGER_JWT_TOKEN] == "test_token"
assert result2["data"][CHARGER_JWT_REFRESH_TOKEN] == "test_refresh_token"
async def test_form_reauth(
@@ -148,9 +150,9 @@ async def test_form_reauth(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"station": "12345",
"username": "test-username",
"password": "test-password",
CONF_STATION: "12345",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
},
)
@@ -181,9 +183,9 @@ async def test_form_reauth_invalid(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"station": "12345678",
"username": "test-username",
"password": "test-password",
CONF_STATION: "12345678",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
},
)

View File

@@ -6,10 +6,11 @@ from unittest.mock import patch
import pytest
from homeassistant.components.input_number import ATTR_VALUE, SERVICE_SET_VALUE
from homeassistant.components.wallbox.const import CHARGER_JWT_TTL
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
from .conftest import http_403_error, http_429_error, setup_integration
from .const import (
@@ -32,18 +33,6 @@ async def test_wallbox_setup_unload_entry(
assert entry.state is ConfigEntryState.NOT_LOADED
async def test_wallbox_unload_entry_connection_error(
hass: HomeAssistant, entry: MockConfigEntry, mock_wallbox
) -> None:
"""Test Wallbox Unload Connection Error."""
with patch.object(mock_wallbox, "authenticate", side_effect=http_403_error):
await setup_integration(hass, entry)
assert entry.state is ConfigEntryState.SETUP_ERROR
assert await hass.config_entries.async_unload(entry.entry_id)
assert entry.state is ConfigEntryState.NOT_LOADED
async def test_wallbox_refresh_failed_connection_error_too_many_requests(
hass: HomeAssistant, entry: MockConfigEntry, mock_wallbox
) -> None:
@@ -69,9 +58,15 @@ async def test_wallbox_refresh_failed_error_auth(
await setup_integration(hass, entry)
assert entry.state is ConfigEntryState.LOADED
data = dict(entry.data)
data[CHARGER_JWT_TTL] = (
datetime.timestamp(datetime.now() - timedelta(hours=1)) * 1000
)
hass.config_entries.async_update_entry(entry, data=data)
with (
patch.object(mock_wallbox, "authenticate", side_effect=http_403_error),
pytest.raises(HomeAssistantError),
pytest.raises(ConfigEntryAuthFailed),
):
await hass.services.async_call(
"number",