mirror of
https://github.com/home-assistant/core.git
synced 2026-04-17 23:53:49 +01:00
475 lines
15 KiB
Python
475 lines
15 KiB
Python
"""Config flow for Satel Integra."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from satel_integra import AsyncSatel
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.binary_sensor import BinarySensorDeviceClass
|
|
from homeassistant.config_entries import (
|
|
ConfigEntry,
|
|
ConfigEntryState,
|
|
ConfigFlow,
|
|
ConfigFlowResult,
|
|
ConfigSubentryFlow,
|
|
OptionsFlow,
|
|
SubentryFlowResult,
|
|
)
|
|
from homeassistant.const import CONF_CODE, CONF_HOST, CONF_NAME, CONF_PORT
|
|
from homeassistant.core import callback
|
|
from homeassistant.helpers import config_validation as cv, selector
|
|
|
|
from .const import (
|
|
CONF_ARM_HOME_MODE,
|
|
CONF_OUTPUT_NUMBER,
|
|
CONF_PARTITION_NUMBER,
|
|
CONF_SWITCHABLE_OUTPUT_NUMBER,
|
|
CONF_ZONE_NUMBER,
|
|
CONF_ZONE_TYPE,
|
|
DEFAULT_CONF_ARM_HOME_MODE,
|
|
DEFAULT_PORT,
|
|
DOMAIN,
|
|
SUBENTRY_TYPE_OUTPUT,
|
|
SUBENTRY_TYPE_PARTITION,
|
|
SUBENTRY_TYPE_SWITCHABLE_OUTPUT,
|
|
SUBENTRY_TYPE_ZONE,
|
|
)
|
|
from .coordinator import SatelConfigEntry
|
|
|
|
_LOGGER = logging.getLogger(__package__)
|
|
|
|
CONNECTION_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_HOST): str,
|
|
vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
|
|
}
|
|
)
|
|
|
|
CODE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(CONF_CODE): cv.string,
|
|
}
|
|
)
|
|
|
|
PARTITION_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_NAME): cv.string,
|
|
vol.Required(CONF_ARM_HOME_MODE, default=DEFAULT_CONF_ARM_HOME_MODE): vol.In(
|
|
[1, 2, 3]
|
|
),
|
|
}
|
|
)
|
|
|
|
ZONE_AND_OUTPUT_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_NAME): cv.string,
|
|
vol.Required(
|
|
CONF_ZONE_TYPE, default=BinarySensorDeviceClass.MOTION
|
|
): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(
|
|
options=[cls.value for cls in BinarySensorDeviceClass],
|
|
mode=selector.SelectSelectorMode.DROPDOWN,
|
|
translation_key="binary_sensor_device_class",
|
|
sort=True,
|
|
),
|
|
),
|
|
}
|
|
)
|
|
|
|
SWITCHABLE_OUTPUT_SCHEMA = vol.Schema({vol.Required(CONF_NAME): cv.string})
|
|
|
|
|
|
class SatelConfigFlow(ConfigFlow, domain=DOMAIN):
|
|
"""Handle a Satel Integra config flow."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the config flow."""
|
|
super().__init__()
|
|
self.connection_data: dict[str, Any] = {}
|
|
|
|
VERSION = 2
|
|
MINOR_VERSION = 1
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(
|
|
config_entry: SatelConfigEntry,
|
|
) -> SatelOptionsFlow:
|
|
"""Create the options flow."""
|
|
return SatelOptionsFlow()
|
|
|
|
@classmethod
|
|
@callback
|
|
def async_get_supported_subentry_types(
|
|
cls, config_entry: ConfigEntry
|
|
) -> dict[str, type[ConfigSubentryFlow]]:
|
|
"""Return subentries supported by this integration."""
|
|
return {
|
|
SUBENTRY_TYPE_PARTITION: PartitionSubentryFlowHandler,
|
|
SUBENTRY_TYPE_ZONE: ZoneSubentryFlowHandler,
|
|
SUBENTRY_TYPE_OUTPUT: OutputSubentryFlowHandler,
|
|
SUBENTRY_TYPE_SWITCHABLE_OUTPUT: SwitchableOutputSubentryFlowHandler,
|
|
}
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle a flow initialized by the user."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
self._async_abort_entries_match({CONF_HOST: user_input[CONF_HOST]})
|
|
|
|
if await self.test_connection(user_input[CONF_HOST], user_input[CONF_PORT]):
|
|
self.connection_data = {
|
|
CONF_HOST: user_input[CONF_HOST],
|
|
CONF_PORT: user_input[CONF_PORT],
|
|
}
|
|
return await self.async_step_code()
|
|
|
|
errors["base"] = "cannot_connect"
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
data_schema=CONNECTION_SCHEMA,
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_code(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle code configuration."""
|
|
if user_input is not None:
|
|
return self.async_create_entry(
|
|
title=self.connection_data[CONF_HOST],
|
|
data=self.connection_data,
|
|
options={CONF_CODE: user_input.get(CONF_CODE)},
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="code",
|
|
data_schema=CODE_SCHEMA,
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle reconfiguration."""
|
|
errors: dict[str, str] = {}
|
|
reconfigure_entry = self._get_reconfigure_entry()
|
|
|
|
if user_input is not None:
|
|
self._async_abort_entries_match({CONF_HOST: user_input[CONF_HOST]})
|
|
|
|
if (
|
|
reconfigure_entry.state is not ConfigEntryState.LOADED
|
|
or reconfigure_entry.data != user_input
|
|
):
|
|
if not await self.test_connection(
|
|
user_input[CONF_HOST], user_input[CONF_PORT]
|
|
):
|
|
errors["base"] = "cannot_connect"
|
|
|
|
if not errors:
|
|
return self.async_update_reload_and_abort(
|
|
reconfigure_entry,
|
|
data_updates={
|
|
CONF_HOST: user_input[CONF_HOST],
|
|
CONF_PORT: user_input[CONF_PORT],
|
|
},
|
|
title=user_input[CONF_HOST],
|
|
)
|
|
|
|
suggested_values: dict[str, Any] = {
|
|
**reconfigure_entry.data,
|
|
**(user_input or {}),
|
|
}
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
CONNECTION_SCHEMA, suggested_values
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def test_connection(self, host: str, port: int) -> bool:
|
|
"""Test a connection to the Satel alarm."""
|
|
controller = AsyncSatel(host, port)
|
|
|
|
try:
|
|
return await controller.connect(check_busy=False)
|
|
except Exception:
|
|
_LOGGER.exception(
|
|
"Unexpected error during connection test to %s:%s",
|
|
host,
|
|
port,
|
|
)
|
|
return False
|
|
finally:
|
|
await controller.close()
|
|
|
|
|
|
class SatelOptionsFlow(OptionsFlow):
|
|
"""Handle Satel options flow."""
|
|
|
|
async def async_step_init(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Init step."""
|
|
if user_input is not None:
|
|
return self.async_create_entry(data={CONF_CODE: user_input.get(CONF_CODE)})
|
|
|
|
return self.async_show_form(
|
|
step_id="init",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
CODE_SCHEMA, self.config_entry.options
|
|
),
|
|
)
|
|
|
|
|
|
class PartitionSubentryFlowHandler(ConfigSubentryFlow):
|
|
"""Handle subentry flow for adding and modifying a partition."""
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add new partition."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
unique_id = f"{SUBENTRY_TYPE_PARTITION}_{user_input[CONF_PARTITION_NUMBER]}"
|
|
|
|
for existing_subentry in self._get_entry().subentries.values():
|
|
if existing_subentry.unique_id == unique_id:
|
|
errors[CONF_PARTITION_NUMBER] = "already_configured"
|
|
|
|
if not errors:
|
|
return self.async_create_entry(
|
|
title=f"{user_input[CONF_NAME]} ({user_input[CONF_PARTITION_NUMBER]})",
|
|
data=user_input,
|
|
unique_id=unique_id,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
errors=errors,
|
|
data_schema=vol.Schema(
|
|
{
|
|
vol.Required(CONF_PARTITION_NUMBER): vol.All(
|
|
vol.Coerce(int), vol.Range(min=1)
|
|
),
|
|
}
|
|
).extend(PARTITION_SCHEMA.schema),
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""Reconfigure existing partition."""
|
|
subconfig_entry = self._get_reconfigure_subentry()
|
|
|
|
if user_input is not None:
|
|
return self.async_update_and_abort(
|
|
self._get_entry(),
|
|
subconfig_entry,
|
|
title=f"{user_input[CONF_NAME]} ({subconfig_entry.data[CONF_PARTITION_NUMBER]})",
|
|
data_updates=user_input,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
PARTITION_SCHEMA,
|
|
subconfig_entry.data,
|
|
),
|
|
description_placeholders={
|
|
CONF_PARTITION_NUMBER: subconfig_entry.data[CONF_PARTITION_NUMBER]
|
|
},
|
|
)
|
|
|
|
|
|
class ZoneSubentryFlowHandler(ConfigSubentryFlow):
|
|
"""Handle subentry flow for adding and modifying a zone."""
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add new zone."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
unique_id = f"{SUBENTRY_TYPE_ZONE}_{user_input[CONF_ZONE_NUMBER]}"
|
|
|
|
for existing_subentry in self._get_entry().subentries.values():
|
|
if existing_subentry.unique_id == unique_id:
|
|
errors[CONF_ZONE_NUMBER] = "already_configured"
|
|
|
|
if not errors:
|
|
return self.async_create_entry(
|
|
title=f"{user_input[CONF_NAME]} ({user_input[CONF_ZONE_NUMBER]})",
|
|
data=user_input,
|
|
unique_id=unique_id,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
errors=errors,
|
|
data_schema=vol.Schema(
|
|
{
|
|
vol.Required(CONF_ZONE_NUMBER): vol.All(
|
|
vol.Coerce(int), vol.Range(min=1)
|
|
),
|
|
}
|
|
).extend(ZONE_AND_OUTPUT_SCHEMA.schema),
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""Reconfigure existing zone."""
|
|
subconfig_entry = self._get_reconfigure_subentry()
|
|
|
|
if user_input is not None:
|
|
return self.async_update_and_abort(
|
|
self._get_entry(),
|
|
subconfig_entry,
|
|
title=f"{user_input[CONF_NAME]} ({subconfig_entry.data[CONF_ZONE_NUMBER]})",
|
|
data_updates=user_input,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
ZONE_AND_OUTPUT_SCHEMA, subconfig_entry.data
|
|
),
|
|
description_placeholders={
|
|
CONF_ZONE_NUMBER: subconfig_entry.data[CONF_ZONE_NUMBER]
|
|
},
|
|
)
|
|
|
|
|
|
class OutputSubentryFlowHandler(ConfigSubentryFlow):
|
|
"""Handle subentry flow for adding and modifying a output."""
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add new output."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
unique_id = f"{SUBENTRY_TYPE_OUTPUT}_{user_input[CONF_OUTPUT_NUMBER]}"
|
|
|
|
for existing_subentry in self._get_entry().subentries.values():
|
|
if existing_subentry.unique_id == unique_id:
|
|
errors[CONF_OUTPUT_NUMBER] = "already_configured"
|
|
|
|
if not errors:
|
|
return self.async_create_entry(
|
|
title=f"{user_input[CONF_NAME]} ({user_input[CONF_OUTPUT_NUMBER]})",
|
|
data=user_input,
|
|
unique_id=unique_id,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
errors=errors,
|
|
data_schema=vol.Schema(
|
|
{
|
|
vol.Required(CONF_OUTPUT_NUMBER): vol.All(
|
|
vol.Coerce(int), vol.Range(min=1)
|
|
),
|
|
}
|
|
).extend(ZONE_AND_OUTPUT_SCHEMA.schema),
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""Reconfigure existing output."""
|
|
subconfig_entry = self._get_reconfigure_subentry()
|
|
|
|
if user_input is not None:
|
|
return self.async_update_and_abort(
|
|
self._get_entry(),
|
|
subconfig_entry,
|
|
title=f"{user_input[CONF_NAME]} ({subconfig_entry.data[CONF_OUTPUT_NUMBER]})",
|
|
data_updates=user_input,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
ZONE_AND_OUTPUT_SCHEMA, subconfig_entry.data
|
|
),
|
|
description_placeholders={
|
|
CONF_OUTPUT_NUMBER: subconfig_entry.data[CONF_OUTPUT_NUMBER]
|
|
},
|
|
)
|
|
|
|
|
|
class SwitchableOutputSubentryFlowHandler(ConfigSubentryFlow):
|
|
"""Handle subentry flow for adding and modifying a switchable output."""
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add new switchable output."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
unique_id = f"{SUBENTRY_TYPE_SWITCHABLE_OUTPUT}_{user_input[CONF_SWITCHABLE_OUTPUT_NUMBER]}"
|
|
|
|
for existing_subentry in self._get_entry().subentries.values():
|
|
if existing_subentry.unique_id == unique_id:
|
|
errors[CONF_SWITCHABLE_OUTPUT_NUMBER] = "already_configured"
|
|
|
|
if not errors:
|
|
return self.async_create_entry(
|
|
title=f"{user_input[CONF_NAME]} ({user_input[CONF_SWITCHABLE_OUTPUT_NUMBER]})",
|
|
data=user_input,
|
|
unique_id=unique_id,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
errors=errors,
|
|
data_schema=vol.Schema(
|
|
{
|
|
vol.Required(CONF_SWITCHABLE_OUTPUT_NUMBER): vol.All(
|
|
vol.Coerce(int), vol.Range(min=1)
|
|
),
|
|
}
|
|
).extend(SWITCHABLE_OUTPUT_SCHEMA.schema),
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""Reconfigure existing switchable output."""
|
|
subconfig_entry = self._get_reconfigure_subentry()
|
|
|
|
if user_input is not None:
|
|
return self.async_update_and_abort(
|
|
self._get_entry(),
|
|
subconfig_entry,
|
|
title=f"{user_input[CONF_NAME]} ({subconfig_entry.data[CONF_SWITCHABLE_OUTPUT_NUMBER]})",
|
|
data_updates=user_input,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
SWITCHABLE_OUTPUT_SCHEMA, subconfig_entry.data
|
|
),
|
|
description_placeholders={
|
|
CONF_SWITCHABLE_OUTPUT_NUMBER: subconfig_entry.data[
|
|
CONF_SWITCHABLE_OUTPUT_NUMBER
|
|
]
|
|
},
|
|
)
|