1
0
mirror of https://github.com/home-assistant/core.git synced 2026-04-02 08:26:41 +01:00

Fix Tibber update token (#164295)

Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
This commit is contained in:
Daniel Hjelseth Høyer
2026-03-17 15:11:51 +01:00
committed by GitHub
parent 51c3397be8
commit 35a99dd4a4
4 changed files with 271 additions and 61 deletions

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
from datetime import timedelta
import asyncio
from datetime import datetime, timedelta
import logging
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, TypedDict, cast
from aiohttp.client_exceptions import ClientError
import tibber
@@ -38,6 +39,58 @@ FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
class TibberHomeData(TypedDict):
"""Data for a Tibber home used by the price sensor."""
currency: str
price_unit: str
current_price: float | None
current_price_time: datetime | None
intraday_price_ranking: float | None
max_price: float
avg_price: float
min_price: float
off_peak_1: float
peak: float
off_peak_2: float
month_cost: float | None
peak_hour: float | None
peak_hour_time: datetime | None
month_cons: float | None
app_nickname: str | None
grid_company: str | None
estimated_annual_consumption: int | None
def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
"""Build TibberHomeData from a TibberHome for the price sensor."""
current_price, last_updated, price_rank = home.current_price_data()
attributes = home.current_attributes()
result: TibberHomeData = {
"currency": home.currency,
"price_unit": home.price_unit,
"current_price": current_price,
"current_price_time": last_updated,
"intraday_price_ranking": price_rank,
"max_price": attributes["max_price"],
"avg_price": attributes["avg_price"],
"min_price": attributes["min_price"],
"off_peak_1": attributes["off_peak_1"],
"peak": attributes["peak"],
"off_peak_2": attributes["off_peak_2"],
"month_cost": home.month_cost,
"peak_hour": home.peak_hour,
"peak_hour_time": home.peak_hour_time,
"month_cons": home.month_cons,
"app_nickname": home.info["viewer"]["home"].get("appNickname"),
"grid_company": home.info["viewer"]["home"]["meteringPointData"]["gridCompany"],
"estimated_annual_consumption": home.info["viewer"]["home"][
"meteringPointData"
]["estimatedAnnualConsumption"],
}
return result
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
@@ -57,13 +110,16 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await tibber_connection.fetch_consumption_data_active_homes()
await tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
@@ -75,7 +131,10 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
for home in tibber_connection.get_homes():
sensors: list[tuple[str, bool, str | None, str]] = []
if home.hourly_consumption_data:
sensors.append(
@@ -194,6 +253,76 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async_add_external_statistics(self.hass, metadata, statistics)
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
"""Handle Tibber price data and insert statistics."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
) -> None:
"""Initialize the price coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"{DOMAIN} price",
update_interval=timedelta(minutes=1),
)
def _seconds_until_next_15_minute(self) -> float:
"""Return seconds until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
now = dt_util.utcnow()
next_minute = ((now.minute // 15) + 1) * 15
if next_minute >= 60:
next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(
hours=1
)
else:
next_run = now.replace(
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
)
return (next_run - now).total_seconds()
async def _async_update_data(self) -> dict[str, TibberHomeData]:
"""Update data via API and return per-home data for sensors."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
active_homes = tibber_connection.get_homes(only_active=True)
try:
await asyncio.gather(
tibber_connection.fetch_consumption_data_active_homes(),
tibber_connection.fetch_production_data_active_homes(),
)
now = dt_util.now()
homes_to_update = [
home
for home in active_homes
if (
(last_data_timestamp := home.last_data_timestamp) is None
or (last_data_timestamp - now).total_seconds() < 11 * 3600
)
]
if homes_to_update:
await asyncio.gather(
*(home.update_info_and_price_info() for home in homes_to_update)
)
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
result = {home.home_id: _build_home_data(home) for home in active_homes}
self.update_interval = timedelta(seconds=self._seconds_until_next_15_minute())
return result
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""

View File

@@ -3,10 +3,8 @@
from __future__ import annotations
from collections.abc import Callable
import datetime
from datetime import timedelta
import logging
from random import randrange
from typing import Any
import aiohttp
@@ -42,18 +40,20 @@ from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.util import Throttle, dt as dt_util
from homeassistant.util import dt as dt_util
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
from .coordinator import (
TibberDataAPICoordinator,
TibberDataCoordinator,
TibberPriceCoordinator,
)
_LOGGER = logging.getLogger(__name__)
ICON = "mdi:currency-usd"
SCAN_INTERVAL = timedelta(minutes=1)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
PARALLEL_UPDATES = 0
TWENTY_MINUTES = 20 * 60
RT_SENSORS_UNIQUE_ID_MIGRATION = {
"accumulated_consumption_last_hour": "accumulated consumption current hour",
@@ -610,6 +610,7 @@ async def _async_setup_graphql_sensors(
entity_registry = er.async_get(hass)
coordinator: TibberDataCoordinator | None = None
price_coordinator: TibberPriceCoordinator | None = None
entities: list[TibberSensor] = []
for home in tibber_connection.get_homes(only_active=False):
try:
@@ -626,7 +627,9 @@ async def _async_setup_graphql_sensors(
raise PlatformNotReady from err
if home.has_active_subscription:
entities.append(TibberSensorElPrice(home))
if price_coordinator is None:
price_coordinator = TibberPriceCoordinator(hass, entry)
entities.append(TibberSensorElPrice(price_coordinator, home))
if coordinator is None:
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
entities.extend(
@@ -737,19 +740,21 @@ class TibberSensor(SensorEntity):
return device_info
class TibberSensorElPrice(TibberSensor):
class TibberSensorElPrice(TibberSensor, CoordinatorEntity[TibberPriceCoordinator]):
"""Representation of a Tibber sensor for el price."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_translation_key = "electricity_price"
def __init__(self, tibber_home: TibberHome) -> None:
def __init__(
self,
coordinator: TibberPriceCoordinator,
tibber_home: TibberHome,
) -> None:
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated: datetime.datetime | None = None
self._spread_load_constant = randrange(TWENTY_MINUTES)
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
self._attr_available = False
self._attr_native_unit_of_measurement = tibber_home.price_unit
self._attr_extra_state_attributes = {
"app_nickname": None,
"grid_company": None,
@@ -768,51 +773,38 @@ class TibberSensorElPrice(TibberSensor):
self._device_name = self._home_name
async def async_update(self) -> None:
"""Get the latest data and updates the states."""
now = dt_util.now()
if (
not self._tibber_home.last_data_timestamp
or (self._tibber_home.last_data_timestamp - now).total_seconds()
< 10 * 3600 - self._spread_load_constant
or not self.available
):
_LOGGER.debug("Asking for new data")
await self._fetch_data()
elif (
self._tibber_home.price_total
and self._last_updated
and self._last_updated.hour == now.hour
and now - self._last_updated < timedelta(minutes=15)
and self._tibber_home.last_data_timestamp
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
data = self.coordinator.data
if not data or (
(home_data := data.get(self._tibber_home.home_id)) is None
or (current_price := home_data.get("current_price")) is None
):
self._attr_available = False
self.async_write_ha_state()
return
res = self._tibber_home.current_price_data()
self._attr_native_value, self._last_updated, price_rank = res
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
attrs = self._tibber_home.current_attributes()
self._attr_extra_state_attributes.update(attrs)
self._attr_available = self._attr_native_value is not None
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def _fetch_data(self) -> None:
_LOGGER.debug("Fetching data")
try:
await self._tibber_home.update_info_and_price_info()
except TimeoutError, aiohttp.ClientError:
return
data = self._tibber_home.info["viewer"]["home"]
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
"gridCompany"
self._attr_native_unit_of_measurement = home_data.get(
"price_unit", self._tibber_home.price_unit
)
self._attr_native_value = current_price
self._attr_extra_state_attributes["intraday_price_ranking"] = home_data.get(
"intraday_price_ranking"
)
self._attr_extra_state_attributes["max_price"] = home_data["max_price"]
self._attr_extra_state_attributes["avg_price"] = home_data["avg_price"]
self._attr_extra_state_attributes["min_price"] = home_data["min_price"]
self._attr_extra_state_attributes["off_peak_1"] = home_data["off_peak_1"]
self._attr_extra_state_attributes["peak"] = home_data["peak"]
self._attr_extra_state_attributes["off_peak_2"] = home_data["off_peak_2"]
self._attr_extra_state_attributes["app_nickname"] = home_data["app_nickname"]
self._attr_extra_state_attributes["grid_company"] = home_data["grid_company"]
self._attr_extra_state_attributes["estimated_annual_consumption"] = home_data[
"estimated_annual_consumption"
]
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
"meteringPointData"
]["estimatedAnnualConsumption"]
self._attr_available = True
self.async_write_ha_state()
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock
import pytest
@@ -10,12 +10,97 @@ from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_component import async_update_entity
from homeassistant.util import dt as dt_util
from .conftest import create_tibber_device
from tests.common import MockConfigEntry
def _create_home(*, current_price: float | None = 1.25) -> MagicMock:
"""Create a mocked Tibber home with an active subscription."""
home = MagicMock()
home.home_id = "home-id"
home.name = "Home"
home.currency = "NOK"
home.price_unit = "NOK/kWh"
home.has_active_subscription = True
home.has_real_time_consumption = False
home.last_data_timestamp = None
home.update_info = AsyncMock(return_value=None)
home.update_info_and_price_info = AsyncMock(return_value=None)
home.current_price_data = MagicMock(
return_value=(current_price, dt_util.utcnow(), 0.4)
)
home.current_attributes = MagicMock(
return_value={
"max_price": 1.8,
"avg_price": 1.2,
"min_price": 0.8,
"off_peak_1": 0.9,
"peak": 1.7,
"off_peak_2": 1.0,
}
)
home.month_cost = 111.1
home.peak_hour = 2.5
home.peak_hour_time = dt_util.utcnow()
home.month_cons = 222.2
home.hourly_consumption_data = []
home.hourly_production_data = []
home.info = {
"viewer": {
"home": {
"appNickname": "Home",
"address": {"address1": "Street 1"},
"meteringPointData": {
"gridCompany": "GridCo",
"estimatedAnnualConsumption": 12000,
},
}
}
}
return home
async def test_price_sensor_state_unit_and_attributes(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
tibber_mock: MagicMock,
setup_credentials: None,
entity_registry: er.EntityRegistry,
) -> None:
"""Test price sensor state and attributes."""
home = _create_home(current_price=1.25)
tibber_mock.get_homes.return_value = [home]
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, home.home_id)
assert entity_id is not None
await async_update_entity(hass, entity_id)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state is not None
assert float(state.state) == 1.25
assert state.attributes["unit_of_measurement"] == "NOK/kWh"
assert state.attributes["app_nickname"] == "Home"
assert state.attributes["grid_company"] == "GridCo"
assert state.attributes["estimated_annual_consumption"] == 12000
assert state.attributes["intraday_price_ranking"] == 0.4
assert state.attributes["max_price"] == 1.8
assert state.attributes["avg_price"] == 1.2
assert state.attributes["min_price"] == 0.8
assert state.attributes["off_peak_1"] == 0.9
assert state.attributes["peak"] == 1.7
assert state.attributes["off_peak_2"] == 1.0
async def test_data_api_sensors_are_created(
recorder_mock: Recorder,
hass: HomeAssistant,

View File

@@ -24,6 +24,10 @@ async def test_async_setup_entry(
tibber_connection.fetch_production_data_active_homes.return_value = None
tibber_connection.get_homes = mock_get_homes
runtime_data = AsyncMock()
runtime_data.async_get_client.return_value = tibber_connection
config_entry.runtime_data = runtime_data
coordinator = TibberDataCoordinator(hass, config_entry, tibber_connection)
await coordinator._async_update_data()
await async_wait_recording_done(hass)