mirror of
https://github.com/home-assistant/core.git
synced 2025-12-21 03:20:01 +00:00
176 lines
6.3 KiB
Python
176 lines
6.3 KiB
Python
"""Repairs for KNX integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from functools import partial
|
|
from typing import TYPE_CHECKING, Any, Final
|
|
|
|
import voluptuous as vol
|
|
from xknx.exceptions.exception import InvalidSecureConfiguration
|
|
from xknx.telegram import GroupAddress, IndividualAddress, Telegram
|
|
|
|
from homeassistant import data_entry_flow
|
|
from homeassistant.components.repairs import RepairsFlow
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import issue_registry as ir, selector
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
|
|
if TYPE_CHECKING:
|
|
from .knx_module import KNXModule
|
|
|
|
from .const import (
|
|
CONF_KNX_KNXKEY_PASSWORD,
|
|
DOMAIN,
|
|
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
|
|
KNXConfigEntryData,
|
|
)
|
|
from .storage.keyring import DEFAULT_KNX_KEYRING_FILENAME, save_uploaded_knxkeys_file
|
|
from .telegrams import SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM, TelegramDict
|
|
|
|
CONF_KEYRING_FILE: Final = "knxkeys_file"
|
|
|
|
|
|
async def async_create_fix_flow(
|
|
hass: HomeAssistant,
|
|
issue_id: str,
|
|
data: dict[str, str | int | float | None] | None,
|
|
) -> RepairsFlow:
|
|
"""Create flow."""
|
|
if issue_id == REPAIR_ISSUE_DATA_SECURE_GROUP_KEY:
|
|
return DataSecureGroupIssueRepairFlow()
|
|
# If KNX adds confirm-only repairs in the future, this should be changed
|
|
# to return a ConfirmRepairFlow instead of raising a ValueError
|
|
raise ValueError(f"unknown repair {issue_id}")
|
|
|
|
|
|
######################
|
|
# DataSecure key issue
|
|
######################
|
|
|
|
|
|
@callback
|
|
def data_secure_group_key_issue_dispatcher(knx_module: KNXModule) -> Callable[[], None]:
|
|
"""Watcher for DataSecure group key issues."""
|
|
return async_dispatcher_connect(
|
|
knx_module.hass,
|
|
signal=SIGNAL_KNX_DATA_SECURE_ISSUE_TELEGRAM,
|
|
target=partial(_data_secure_group_key_issue_handler, knx_module),
|
|
)
|
|
|
|
|
|
@callback
|
|
def _data_secure_group_key_issue_handler(
|
|
knx_module: KNXModule, telegram: Telegram, telegram_dict: TelegramDict
|
|
) -> None:
|
|
"""Handle DataSecure group key issue telegrams."""
|
|
if telegram.destination_address not in knx_module.group_address_entities:
|
|
# Only report issues for configured group addresses
|
|
return
|
|
|
|
issue_registry = ir.async_get(knx_module.hass)
|
|
new_ga = str(telegram.destination_address)
|
|
new_ia = str(telegram.source_address)
|
|
new_data = {new_ga: new_ia}
|
|
|
|
if existing_issue := issue_registry.async_get_issue(
|
|
DOMAIN, REPAIR_ISSUE_DATA_SECURE_GROUP_KEY
|
|
):
|
|
assert isinstance(existing_issue.data, dict)
|
|
existing_data: dict[str, str] = existing_issue.data # type: ignore[assignment]
|
|
if new_ga in existing_data:
|
|
current_ias = existing_data[new_ga].split(", ")
|
|
if new_ia in current_ias:
|
|
return
|
|
current_ias = sorted([*current_ias, new_ia], key=IndividualAddress)
|
|
new_data[new_ga] = ", ".join(current_ias)
|
|
new_data_unsorted = existing_data | new_data
|
|
new_data = {
|
|
key: new_data_unsorted[key]
|
|
for key in sorted(new_data_unsorted, key=GroupAddress)
|
|
}
|
|
|
|
issue_registry.async_get_or_create(
|
|
DOMAIN,
|
|
REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
|
|
data=new_data, # type: ignore[arg-type]
|
|
is_fixable=True,
|
|
is_persistent=True,
|
|
severity=ir.IssueSeverity.ERROR,
|
|
translation_key=REPAIR_ISSUE_DATA_SECURE_GROUP_KEY,
|
|
translation_placeholders={
|
|
"addresses": "\n".join(
|
|
f"`{ga}` from {ias}" for ga, ias in new_data.items()
|
|
),
|
|
"interface": str(knx_module.xknx.current_address),
|
|
},
|
|
)
|
|
|
|
|
|
class DataSecureGroupIssueRepairFlow(RepairsFlow):
|
|
"""Handler for an issue fixing flow for outdated DataSecure keys."""
|
|
|
|
@callback
|
|
def _async_get_placeholders(self) -> dict[str, str]:
|
|
issue_registry = ir.async_get(self.hass)
|
|
issue = issue_registry.async_get_issue(self.handler, self.issue_id)
|
|
assert issue is not None
|
|
return issue.translation_placeholders or {}
|
|
|
|
async def async_step_init(
|
|
self, user_input: dict[str, str] | None = None
|
|
) -> data_entry_flow.FlowResult:
|
|
"""Handle the first step of a fix flow."""
|
|
return await self.async_step_secure_knxkeys()
|
|
|
|
async def async_step_secure_knxkeys(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> data_entry_flow.FlowResult:
|
|
"""Manage upload of new KNX Keyring file."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
password = user_input[CONF_KNX_KNXKEY_PASSWORD]
|
|
keyring = None
|
|
try:
|
|
keyring = await save_uploaded_knxkeys_file(
|
|
self.hass,
|
|
uploaded_file_id=user_input[CONF_KEYRING_FILE],
|
|
password=password,
|
|
)
|
|
except InvalidSecureConfiguration:
|
|
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
|
|
|
|
if not errors and keyring:
|
|
new_entry_data = KNXConfigEntryData(
|
|
knxkeys_filename=f"{DOMAIN}/{DEFAULT_KNX_KEYRING_FILENAME}",
|
|
knxkeys_password=password,
|
|
)
|
|
return self.finish_flow(new_entry_data)
|
|
|
|
fields = {
|
|
vol.Required(CONF_KEYRING_FILE): selector.FileSelector(
|
|
config=selector.FileSelectorConfig(accept=".knxkeys")
|
|
),
|
|
vol.Required(CONF_KNX_KNXKEY_PASSWORD): selector.TextSelector(),
|
|
}
|
|
return self.async_show_form(
|
|
step_id="secure_knxkeys",
|
|
data_schema=vol.Schema(fields),
|
|
description_placeholders=self._async_get_placeholders(),
|
|
errors=errors,
|
|
)
|
|
|
|
@callback
|
|
def finish_flow(
|
|
self, new_entry_data: KNXConfigEntryData
|
|
) -> data_entry_flow.FlowResult:
|
|
"""Finish the repair flow. Reload the config entry."""
|
|
knx_config_entries = self.hass.config_entries.async_entries(DOMAIN)
|
|
if knx_config_entries:
|
|
config_entry = knx_config_entries[0] # single_config_entry
|
|
new_data = {**config_entry.data, **new_entry_data}
|
|
self.hass.config_entries.async_update_entry(config_entry, data=new_data)
|
|
self.hass.config_entries.async_schedule_reload(config_entry.entry_id)
|
|
return self.async_create_entry(data={})
|