1
0
mirror of https://github.com/home-assistant/core.git synced 2026-04-17 23:53:49 +01:00
Files
core/homeassistant/components/tesla_fleet/config_flow.py
2026-02-09 15:33:05 +01:00

253 lines
8.4 KiB
Python

"""Config Flow for Tesla Fleet integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
import re
from typing import Any, cast
import jwt
from tesla_fleet_api import TeslaFleetApi
from tesla_fleet_api.const import SERVERS
from tesla_fleet_api.exceptions import PreconditionFailed, TeslaFleetError
import voluptuous as vol
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
QrCodeSelector,
QrCodeSelectorConfig,
QrErrorCorrectionLevel,
)
from .const import CONF_DOMAIN, DOMAIN, LOGGER
from .oauth import TeslaUserImplementation
class OAuth2FlowHandler(
config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
):
"""Config flow to handle Tesla Fleet API OAuth2 authentication."""
DOMAIN = DOMAIN
def __init__(self) -> None:
"""Initialize config flow."""
super().__init__()
self.domain: str | None = None
self.data: dict[str, Any] = {}
self.uid: str | None = None
self.apis: list[TeslaFleetApi] = []
@property
def logger(self) -> logging.Logger:
"""Return logger."""
return LOGGER
async def async_oauth_create_entry(
self,
data: dict[str, Any],
) -> ConfigFlowResult:
"""Handle OAuth completion and proceed to domain registration."""
token = jwt.decode(
data["token"]["access_token"], options={"verify_signature": False}
)
self.data = data
self.uid = token["sub"]
await self.async_set_unique_id(self.uid)
if self.source == SOURCE_REAUTH:
self._abort_if_unique_id_mismatch(reason="reauth_account_mismatch")
return self.async_update_reload_and_abort(
self._get_reauth_entry(), data=data
)
self._abort_if_unique_id_configured()
# OAuth done, setup Partner API connections for all regions
implementation = cast(TeslaUserImplementation, self.flow_impl)
session = async_get_clientsession(self.hass)
for region, server_url in SERVERS.items():
if region == "cn":
continue
api = TeslaFleetApi(
session=session,
access_token="",
server=server_url,
partner_scope=True,
charging_scope=False,
energy_scope=False,
user_scope=False,
vehicle_scope=False,
)
await api.get_private_key(self.hass.config.path("tesla_fleet.key"))
await api.partner_login(
implementation.client_id, implementation.client_secret
)
self.apis.append(api)
return await self.async_step_domain_input()
async def async_step_domain_input(
self,
user_input: dict[str, Any] | None = None,
errors: dict[str, str] | None = None,
) -> ConfigFlowResult:
"""Handle domain input step."""
errors = errors or {}
if user_input is not None:
domain = user_input[CONF_DOMAIN].strip().lower()
# Validate domain format
if not self._is_valid_domain(domain):
errors[CONF_DOMAIN] = "invalid_domain"
else:
self.domain = domain
return await self.async_step_domain_registration()
return self.async_show_form(
step_id="domain_input",
description_placeholders={
"dashboard": "https://developer.tesla.com/en_AU/dashboard/"
},
data_schema=vol.Schema(
{
vol.Required(CONF_DOMAIN): str,
}
),
errors=errors,
)
async def async_step_domain_registration(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle domain registration for all regions."""
assert self.apis
assert self.apis[0].private_key
assert self.domain
errors: dict[str, str] = {}
description_placeholders = {
"public_key_url": f"https://{self.domain}/.well-known/appspecific/com.tesla.3p.public-key.pem",
"pem": self.apis[0].public_pem,
}
successful_response: dict[str, Any] | None = None
failed_regions: list[str] = []
for api in self.apis:
try:
register_response = await api.partner.register(self.domain)
except PreconditionFailed:
return await self.async_step_domain_input(
errors={CONF_DOMAIN: "precondition_failed"}
)
except TeslaFleetError as e:
LOGGER.warning(
"Partner registration failed for %s: %s",
api.server,
e.message,
)
failed_regions.append(api.server or "unknown")
else:
if successful_response is None:
successful_response = register_response
if successful_response is None:
errors["base"] = "invalid_response"
return self.async_show_form(
step_id="domain_registration",
description_placeholders=description_placeholders,
errors=errors,
)
if failed_regions:
LOGGER.warning(
"Partner registration succeeded on some regions but failed on: %s",
", ".join(failed_regions),
)
# Verify public key from the successful response
registered_public_key = successful_response.get("response", {}).get(
"public_key"
)
if not registered_public_key:
errors["base"] = "public_key_not_found"
elif (
registered_public_key.lower()
!= self.apis[0].public_uncompressed_point.lower()
):
errors["base"] = "public_key_mismatch"
else:
return await self.async_step_registration_complete()
return self.async_show_form(
step_id="domain_registration",
description_placeholders=description_placeholders,
errors=errors,
)
async def async_step_registration_complete(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Show completion and virtual key installation."""
if user_input is not None and self.uid and self.data:
return self.async_create_entry(title=self.uid, data=self.data)
if not self.domain:
return await self.async_step_domain_input()
virtual_key_url = f"https://www.tesla.com/_ak/{self.domain}"
data_schema = vol.Schema({}).extend(
{
vol.Optional("qr_code"): QrCodeSelector(
config=QrCodeSelectorConfig(
data=virtual_key_url,
scale=6,
error_correction_level=QrErrorCorrectionLevel.QUARTILE,
)
),
}
)
return self.async_show_form(
step_id="registration_complete",
data_schema=data_schema,
description_placeholders={
"virtual_key_url": virtual_key_url,
},
)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Perform reauth upon an API authentication error."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauth dialog."""
if user_input is None:
return self.async_show_form(
step_id="reauth_confirm",
description_placeholders={"name": "Tesla Fleet"},
)
# For reauth, skip domain registration and go straight to OAuth
return await super().async_step_user()
def _is_valid_domain(self, domain: str) -> bool:
"""Validate domain format."""
# Basic domain validation regex
domain_pattern = re.compile(
r"^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$"
)
return bool(domain_pattern.match(domain))