1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-21 03:20:01 +00:00
Files
core/homeassistant/components/knx/repairs.py
2025-12-18 18:23:30 +01:00

176 lines
6.3 KiB
Python

"""Repairs for KNX integration."""
from __future__ import annotations
from collections.abc import Callable
from functools import partial
from typing import TYPE_CHECKING, Any, Final
import voluptuous as vol
from xknx.exceptions.exception import InvalidSecureConfiguration
from xknx.telegram import GroupAddress, IndividualAddress, Telegram
from homeassistant import data_entry_flow
from homeassistant.components.repairs import RepairsFlow
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import issue_registry as ir, selector
from homeassistant.helpers.dispatcher import async_dispatcher_connect
if TYPE_CHECKING:
from .knx_module import KNXModule
from .const import (
CONF_KNX_KNXKEY_PASSWORD,
DOMAIN,
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
KNXConfigEntryData,
)
from .storage.keyring import DEFAULT_KNX_KEYRING_FILENAME, save_uploaded_knxkeys_file
from .telegrams import SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM, TelegramDict
CONF_KEYRING_FILE: Final = "knxkeys_file"
async def async_create_fix_flow(
hass: HomeAssistant,
issue_id: str,
data: dict[str, str | int | float | None] | None,
) -> RepairsFlow:
"""Create flow."""
if issue_id == REPAIR_ISSUE_DATA_SECURE_GROUP_KEY:
return DataSecureGroupIssueRepairFlow()
# If KNX adds confirm-only repairs in the future, this should be changed
# to return a ConfirmRepairFlow instead of raising a ValueError
raise ValueError(f"unknown repair {issue_id}")
######################
# DataSecure key issue
######################
@callback
def data_secure_group_key_issue_dispatcher(knx_module: KNXModule) -> Callable[[], None]:
"""Watcher for DataSecure group key issues."""
return async_dispatcher_connect(
knx_module.hass,
signal=SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM,
target=partial(_data_secure_group_key_issue_handler, knx_module),
)
@callback
def _data_secure_group_key_issue_handler(
knx_module: KNXModule, telegram: Telegram, telegram_dict: TelegramDict
) -> None:
"""Handle DataSecure group key issue telegrams."""
if telegram.destination_address not in knx_module.group_address_entities:
# Only report issues for configured group addresses
return
issue_registry = ir.async_get(knx_module.hass)
new_ga = str(telegram.destination_address)
new_ia = str(telegram.source_address)
new_data = {new_ga: new_ia}
if existing_issue := issue_registry.async_get_issue(
DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY
):
assert isinstance(existing_issue.data, dict)
existing_data: dict[str, str] = existing_issue.data # type: ignore[assignment]
if new_ga in existing_data:
current_ias = existing_data[new_ga].split(", ")
if new_ia in current_ias:
return
current_ias = sorted([*current_ias, new_ia], key=IndividualAddress)
new_data[new_ga] = ", ".join(current_ias)
new_data_unsorted = existing_data | new_data
new_data = {
key: new_data_unsorted[key]
for key in sorted(new_data_unsorted, key=GroupAddress)
}
issue_registry.async_get_or_create(
DOMAIN,
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
data=new_data, # type: ignore[arg-type]
is_fixable=True,
is_persistent=True,
severity=ir.IssueSeverity.ERROR,
translation_key=REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
translation_placeholders={
"addresses": "\n".join(
f"`{ga}` from {ias}" for ga, ias in new_data.items()
),
"interface": str(knx_module.xknx.current_address),
},
)
class DataSecureGroupIssueRepairFlow(RepairsFlow):
"""Handler for an issue fixing flow for outdated DataSecure keys."""
@callback
def _async_get_placeholders(self) -> dict[str, str]:
issue_registry = ir.async_get(self.hass)
issue = issue_registry.async_get_issue(self.handler, self.issue_id)
assert issue is not None
return issue.translation_placeholders or {}
async def async_step_init(
self, user_input: dict[str, str] | None = None
) -> data_entry_flow.FlowResult:
"""Handle the first step of a fix flow."""
return await self.async_step_secure_knxkeys()
async def async_step_secure_knxkeys(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Manage upload of new KNX Keyring file."""
errors: dict[str, str] = {}
if user_input is not None:
password = user_input[CONF_KNX_KNXKEY_PASSWORD]
keyring = None
try:
keyring = await save_uploaded_knxkeys_file(
self.hass,
uploaded_file_id=user_input[CONF_KEYRING_FILE],
password=password,
)
except InvalidSecureConfiguration:
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
if not errors and keyring:
new_entry_data = KNXConfigEntryData(
knxkeys_filename=f"{DOMAIN}/{DEFAULT_KNX_KEYRING_FILENAME}",
knxkeys_password=password,
)
return self.finish_flow(new_entry_data)
fields = {
vol.Required(CONF_KEYRING_FILE): selector.FileSelector(
config=selector.FileSelectorConfig(accept=".knxkeys")
),
vol.Required(CONF_KNX_KNXKEY_PASSWORD): selector.TextSelector(),
}
return self.async_show_form(
step_id="secure_knxkeys",
data_schema=vol.Schema(fields),
description_placeholders=self._async_get_placeholders(),
errors=errors,
)
@callback
def finish_flow(
self, new_entry_data: KNXConfigEntryData
) -> data_entry_flow.FlowResult:
"""Finish the repair flow. Reload the config entry."""
knx_config_entries = self.hass.config_entries.async_entries(DOMAIN)
if knx_config_entries:
config_entry = knx_config_entries[0] # single_config_entry
new_data = {**config_entry.data, **new_entry_data}
self.hass.config_entries.async_update_entry(config_entry, data=new_data)
self.hass.config_entries.async_schedule_reload(config_entry.entry_id)
return self.async_create_entry(data={})