1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 12:59:34 +00:00

Update typing 01 (#48013)

This commit is contained in:
Marc Mueller
2021-03-17 17:34:55 +01:00
committed by GitHub
parent 9011a54e7f
commit e55702d635
11 changed files with 303 additions and 313 deletions

View File

@@ -5,7 +5,7 @@ import asyncio
import functools
import logging
from types import MappingProxyType, MethodType
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
from typing import Any, Callable, Optional, cast
import weakref
import attr
@@ -143,11 +143,11 @@ class ConfigEntry:
source: str,
connection_class: str,
system_options: dict,
options: Optional[dict] = None,
unique_id: Optional[str] = None,
entry_id: Optional[str] = None,
options: dict | None = None,
unique_id: str | None = None,
entry_id: str | None = None,
state: str = ENTRY_STATE_NOT_LOADED,
disabled_by: Optional[str] = None,
disabled_by: str | None = None,
) -> None:
"""Initialize a config entry."""
# Unique id of the config entry
@@ -190,18 +190,18 @@ class ConfigEntry:
self.supports_unload = False
# Listeners to call on update
self.update_listeners: List[
Union[weakref.ReferenceType[UpdateListenerType], weakref.WeakMethod]
self.update_listeners: list[
weakref.ReferenceType[UpdateListenerType] | weakref.WeakMethod
] = []
# Function to cancel a scheduled retry
self._async_cancel_retry_setup: Optional[Callable[[], Any]] = None
self._async_cancel_retry_setup: Callable[[], Any] | None = None
async def async_setup(
self,
hass: HomeAssistant,
*,
integration: Optional[loader.Integration] = None,
integration: loader.Integration | None = None,
tries: int = 0,
) -> None:
"""Set up an entry."""
@@ -295,7 +295,7 @@ class ConfigEntry:
self.state = ENTRY_STATE_SETUP_ERROR
async def async_unload(
self, hass: HomeAssistant, *, integration: Optional[loader.Integration] = None
self, hass: HomeAssistant, *, integration: loader.Integration | None = None
) -> bool:
"""Unload an entry.
@@ -442,7 +442,7 @@ class ConfigEntry:
return lambda: self.update_listeners.remove(weak_listener)
def as_dict(self) -> Dict[str, Any]:
def as_dict(self) -> dict[str, Any]:
"""Return dictionary version of this entry."""
return {
"entry_id": self.entry_id,
@@ -471,8 +471,8 @@ class ConfigEntriesFlowManager(data_entry_flow.FlowManager):
self._hass_config = hass_config
async def async_finish_flow(
self, flow: data_entry_flow.FlowHandler, result: Dict[str, Any]
) -> Dict[str, Any]:
self, flow: data_entry_flow.FlowHandler, result: dict[str, Any]
) -> dict[str, Any]:
"""Finish a config flow and add an entry."""
flow = cast(ConfigFlow, flow)
@@ -542,7 +542,7 @@ class ConfigEntriesFlowManager(data_entry_flow.FlowManager):
return result
async def async_create_flow(
self, handler_key: Any, *, context: Optional[Dict] = None, data: Any = None
self, handler_key: Any, *, context: dict | None = None, data: Any = None
) -> ConfigFlow:
"""Create a flow for specified handler.
@@ -619,14 +619,14 @@ class ConfigEntries:
self.flow = ConfigEntriesFlowManager(hass, self, hass_config)
self.options = OptionsFlowManager(hass)
self._hass_config = hass_config
self._entries: List[ConfigEntry] = []
self._entries: list[ConfigEntry] = []
self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
EntityRegistryDisabledHandler(hass).async_setup()
@callback
def async_domains(self) -> List[str]:
def async_domains(self) -> list[str]:
"""Return domains for which we have entries."""
seen: Set[str] = set()
seen: set[str] = set()
result = []
for entry in self._entries:
@@ -637,7 +637,7 @@ class ConfigEntries:
return result
@callback
def async_get_entry(self, entry_id: str) -> Optional[ConfigEntry]:
def async_get_entry(self, entry_id: str) -> ConfigEntry | None:
"""Return entry with matching entry_id."""
for entry in self._entries:
if entry_id == entry.entry_id:
@@ -645,7 +645,7 @@ class ConfigEntries:
return None
@callback
def async_entries(self, domain: Optional[str] = None) -> List[ConfigEntry]:
def async_entries(self, domain: str | None = None) -> list[ConfigEntry]:
"""Return all entries or entries for a specific domain."""
if domain is None:
return list(self._entries)
@@ -657,7 +657,7 @@ class ConfigEntries:
await self.async_setup(entry.entry_id)
self._async_schedule_save()
async def async_remove(self, entry_id: str) -> Dict[str, Any]:
async def async_remove(self, entry_id: str) -> dict[str, Any]:
"""Remove an entry."""
entry = self.async_get_entry(entry_id)
@@ -789,7 +789,7 @@ class ConfigEntries:
return await self.async_setup(entry_id)
async def async_set_disabled_by(
self, entry_id: str, disabled_by: Optional[str]
self, entry_id: str, disabled_by: str | None
) -> bool:
"""Disable an entry.
@@ -829,11 +829,11 @@ class ConfigEntries:
self,
entry: ConfigEntry,
*,
unique_id: Union[str, dict, None, UndefinedType] = UNDEFINED,
title: Union[str, dict, UndefinedType] = UNDEFINED,
data: Union[dict, UndefinedType] = UNDEFINED,
options: Union[dict, UndefinedType] = UNDEFINED,
system_options: Union[dict, UndefinedType] = UNDEFINED,
unique_id: str | dict | None | UndefinedType = UNDEFINED,
title: str | dict | UndefinedType = UNDEFINED,
data: dict | UndefinedType = UNDEFINED,
options: dict | UndefinedType = UNDEFINED,
system_options: dict | UndefinedType = UNDEFINED,
) -> bool:
"""Update a config entry.
@@ -918,12 +918,12 @@ class ConfigEntries:
self._store.async_delay_save(self._data_to_save, SAVE_DELAY)
@callback
def _data_to_save(self) -> Dict[str, List[Dict[str, Any]]]:
def _data_to_save(self) -> dict[str, list[dict[str, Any]]]:
"""Return data to save."""
return {"entries": [entry.as_dict() for entry in self._entries]}
async def _old_conf_migrator(old_config: Dict[str, Any]) -> Dict[str, Any]:
async def _old_conf_migrator(old_config: dict[str, Any]) -> dict[str, Any]:
"""Migrate the pre-0.73 config format to the latest version."""
return {"entries": old_config}
@@ -931,7 +931,7 @@ async def _old_conf_migrator(old_config: Dict[str, Any]) -> Dict[str, Any]:
class ConfigFlow(data_entry_flow.FlowHandler):
"""Base class for config flows with some helpers."""
def __init_subclass__(cls, domain: Optional[str] = None, **kwargs: Any) -> None:
def __init_subclass__(cls, domain: str | None = None, **kwargs: Any) -> None:
"""Initialize a subclass, register if possible."""
super().__init_subclass__(**kwargs) # type: ignore
if domain is not None:
@@ -940,7 +940,7 @@ class ConfigFlow(data_entry_flow.FlowHandler):
CONNECTION_CLASS = CONN_CLASS_UNKNOWN
@property
def unique_id(self) -> Optional[str]:
def unique_id(self) -> str | None:
"""Return unique ID if available."""
if not self.context:
return None
@@ -956,7 +956,7 @@ class ConfigFlow(data_entry_flow.FlowHandler):
@callback
def _abort_if_unique_id_configured(
self,
updates: Optional[Dict[Any, Any]] = None,
updates: dict[Any, Any] | None = None,
reload_on_update: bool = True,
) -> None:
"""Abort if the unique ID is already configured."""
@@ -983,8 +983,8 @@ class ConfigFlow(data_entry_flow.FlowHandler):
raise data_entry_flow.AbortFlow("already_configured")
async def async_set_unique_id(
self, unique_id: Optional[str] = None, *, raise_on_progress: bool = True
) -> Optional[ConfigEntry]:
self, unique_id: str | None = None, *, raise_on_progress: bool = True
) -> ConfigEntry | None:
"""Set a unique ID for the config flow.
Returns optionally existing config entry with same ID.
@@ -1020,7 +1020,7 @@ class ConfigFlow(data_entry_flow.FlowHandler):
self.context["confirm_only"] = True
@callback
def _async_current_entries(self, include_ignore: bool = False) -> List[ConfigEntry]:
def _async_current_entries(self, include_ignore: bool = False) -> list[ConfigEntry]:
"""Return current entries.
If the flow is user initiated, filter out ignored entries unless include_ignore is True.
@@ -1033,7 +1033,7 @@ class ConfigFlow(data_entry_flow.FlowHandler):
return [entry for entry in config_entries if entry.source != SOURCE_IGNORE]
@callback
def _async_current_ids(self, include_ignore: bool = True) -> Set[Optional[str]]:
def _async_current_ids(self, include_ignore: bool = True) -> set[str | None]:
"""Return current unique IDs."""
return {
entry.unique_id
@@ -1042,7 +1042,7 @@ class ConfigFlow(data_entry_flow.FlowHandler):
}
@callback
def _async_in_progress(self) -> List[Dict]:
def _async_in_progress(self) -> list[dict]:
"""Return other in progress flows for current domain."""
return [
flw
@@ -1050,18 +1050,18 @@ class ConfigFlow(data_entry_flow.FlowHandler):
if flw["handler"] == self.handler and flw["flow_id"] != self.flow_id
]
async def async_step_ignore(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
async def async_step_ignore(self, user_input: dict[str, Any]) -> dict[str, Any]:
"""Ignore this config flow."""
await self.async_set_unique_id(user_input["unique_id"], raise_on_progress=False)
return self.async_create_entry(title=user_input["title"], data={})
async def async_step_unignore(self, user_input: Dict[str, Any]) -> Dict[str, Any]:
async def async_step_unignore(self, user_input: dict[str, Any]) -> dict[str, Any]:
"""Rediscover a config entry by it's unique_id."""
return self.async_abort(reason="not_implemented")
async def async_step_user(
self, user_input: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
self, user_input: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Handle a flow initiated by the user."""
return self.async_abort(reason="not_implemented")
@@ -1090,16 +1090,16 @@ class ConfigFlow(data_entry_flow.FlowHandler):
raise data_entry_flow.AbortFlow("already_in_progress")
async def async_step_discovery(
self, discovery_info: Dict[str, Any]
) -> Dict[str, Any]:
self, discovery_info: dict[str, Any]
) -> dict[str, Any]:
"""Handle a flow initialized by discovery."""
await self._async_handle_discovery_without_unique_id()
return await self.async_step_user()
@callback
def async_abort(
self, *, reason: str, description_placeholders: Optional[Dict] = None
) -> Dict[str, Any]:
self, *, reason: str, description_placeholders: dict | None = None
) -> dict[str, Any]:
"""Abort the config flow."""
# Remove reauth notification if no reauth flows are in progress
if self.source == SOURCE_REAUTH and not any(
@@ -1130,8 +1130,8 @@ class OptionsFlowManager(data_entry_flow.FlowManager):
self,
handler_key: Any,
*,
context: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
context: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> OptionsFlow:
"""Create an options flow for a config entry.
@@ -1147,8 +1147,8 @@ class OptionsFlowManager(data_entry_flow.FlowManager):
return cast(OptionsFlow, HANDLERS[entry.domain].async_get_options_flow(entry))
async def async_finish_flow(
self, flow: data_entry_flow.FlowHandler, result: Dict[str, Any]
) -> Dict[str, Any]:
self, flow: data_entry_flow.FlowHandler, result: dict[str, Any]
) -> dict[str, Any]:
"""Finish an options flow and update options for configuration entry.
Flow.handler and entry_id is the same thing to map flow with entry.
@@ -1184,7 +1184,7 @@ class SystemOptions:
"""Update properties."""
self.disable_new_entities = disable_new_entities
def as_dict(self) -> Dict[str, Any]:
def as_dict(self) -> dict[str, Any]:
"""Return dictionary version of this config entries system options."""
return {"disable_new_entities": self.disable_new_entities}
@@ -1195,9 +1195,9 @@ class EntityRegistryDisabledHandler:
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the handler."""
self.hass = hass
self.registry: Optional[entity_registry.EntityRegistry] = None
self.changed: Set[str] = set()
self._remove_call_later: Optional[Callable[[], None]] = None
self.registry: entity_registry.EntityRegistry | None = None
self.changed: set[str] = set()
self._remove_call_later: Callable[[], None] | None = None
@callback
def async_setup(self) -> None: