mirror of
https://github.com/home-assistant/core.git
synced 2026-02-21 18:38:17 +00:00
188 lines
6.3 KiB
Python
188 lines
6.3 KiB
Python
"""Config flow for Splunk integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
import logging
|
|
from typing import Any
|
|
|
|
from hass_splunk import hass_splunk
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
|
|
from homeassistant.const import (
|
|
CONF_HOST,
|
|
CONF_NAME,
|
|
CONF_PORT,
|
|
CONF_SSL,
|
|
CONF_TOKEN,
|
|
CONF_VERIFY_SSL,
|
|
)
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
|
|
from .const import DEFAULT_HOST, DEFAULT_PORT, DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class SplunkConfigFlow(ConfigFlow, domain=DOMAIN):
|
|
"""Handle a config flow for Splunk."""
|
|
|
|
VERSION = 1
|
|
MINOR_VERSION = 1
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle the initial step."""
|
|
# Single instance integration - manifest enforces only one entry
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
errors = await self._async_validate_input(user_input)
|
|
|
|
if not errors:
|
|
host = user_input[CONF_HOST]
|
|
port = user_input[CONF_PORT]
|
|
return self.async_create_entry(
|
|
title=f"{host}:{port}",
|
|
data=user_input,
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
data_schema=vol.Schema(
|
|
{
|
|
vol.Required(CONF_TOKEN): str,
|
|
vol.Required(CONF_HOST): str,
|
|
vol.Optional(CONF_PORT, default=DEFAULT_PORT): int,
|
|
vol.Optional(CONF_SSL, default=False): bool,
|
|
vol.Optional(CONF_VERIFY_SSL, default=True): bool,
|
|
vol.Optional(CONF_NAME): str,
|
|
}
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_import(
|
|
self, import_config: dict[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Handle import from YAML configuration."""
|
|
# Single instance integration - manifest prevents duplicates
|
|
# Validate the imported configuration
|
|
errors = await self._async_validate_input(import_config)
|
|
|
|
if errors:
|
|
# Map error keys to abort reasons for issue creation
|
|
error_key = errors.get("base", "unknown")
|
|
_LOGGER.error("Failed to import Splunk configuration from YAML: %s", errors)
|
|
return self.async_abort(reason=error_key)
|
|
|
|
host = import_config.get(CONF_HOST, DEFAULT_HOST)
|
|
port = import_config.get(CONF_PORT, DEFAULT_PORT)
|
|
return self.async_create_entry(
|
|
title=f"{host}:{port}",
|
|
data=import_config,
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle reconfiguration of the Splunk integration."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
errors = await self._async_validate_input(user_input)
|
|
|
|
if not errors:
|
|
return self.async_update_reload_and_abort(
|
|
self._get_reconfigure_entry(),
|
|
data_updates=user_input,
|
|
title=f"{user_input[CONF_HOST]}:{user_input[CONF_PORT]}",
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
vol.Schema(
|
|
{
|
|
vol.Required(CONF_TOKEN): str,
|
|
vol.Required(CONF_HOST): str,
|
|
vol.Optional(CONF_PORT, default=DEFAULT_PORT): int,
|
|
vol.Optional(CONF_SSL, default=False): bool,
|
|
vol.Optional(CONF_VERIFY_SSL, default=True): bool,
|
|
vol.Optional(CONF_NAME): str,
|
|
}
|
|
),
|
|
self._get_reconfigure_entry().data,
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_reauth(
|
|
self, entry_data: Mapping[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Handle reauth upon an API authentication error."""
|
|
return await self.async_step_reauth_confirm()
|
|
|
|
async def async_step_reauth_confirm(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Confirm reauth dialog."""
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
reauth_entry = self._get_reauth_entry()
|
|
|
|
# Test the new token with existing config
|
|
test_config = {**reauth_entry.data, CONF_TOKEN: user_input[CONF_TOKEN]}
|
|
errors = await self._async_validate_input(test_config)
|
|
|
|
if not errors:
|
|
return self.async_update_reload_and_abort(
|
|
reauth_entry,
|
|
data_updates={CONF_TOKEN: user_input[CONF_TOKEN]},
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reauth_confirm",
|
|
data_schema=vol.Schema({vol.Required(CONF_TOKEN): str}),
|
|
errors=errors,
|
|
)
|
|
|
|
async def _async_validate_input(self, user_input: dict[str, Any]) -> dict[str, str]:
|
|
"""Validate user input and return errors if any."""
|
|
errors: dict[str, str] = {}
|
|
|
|
event_collector = hass_splunk(
|
|
session=async_get_clientsession(self.hass),
|
|
host=user_input.get(CONF_HOST, DEFAULT_HOST),
|
|
port=user_input.get(CONF_PORT, DEFAULT_PORT),
|
|
token=user_input[CONF_TOKEN],
|
|
use_ssl=user_input.get(CONF_SSL, False),
|
|
verify_ssl=user_input.get(CONF_VERIFY_SSL, True),
|
|
)
|
|
|
|
try:
|
|
# First check connectivity
|
|
connectivity_ok = await event_collector.check(
|
|
connectivity=True, token=False, busy=False
|
|
)
|
|
if not connectivity_ok:
|
|
errors["base"] = "cannot_connect"
|
|
return errors
|
|
|
|
# Then check token validity
|
|
token_ok = await event_collector.check(
|
|
connectivity=False, token=True, busy=False
|
|
)
|
|
if not token_ok:
|
|
errors["base"] = "invalid_auth"
|
|
return errors
|
|
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected error validating Splunk connection")
|
|
errors["base"] = "unknown"
|
|
|
|
return errors
|