mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 12:59:34 +00:00
Preload storage for integrations we know we are going to setup (#114192)
This commit is contained in:
@@ -3,16 +3,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable, Mapping, Sequence
|
||||
from collections.abc import Callable, Iterable, Mapping, Sequence
|
||||
from contextlib import suppress
|
||||
from copy import deepcopy
|
||||
import inspect
|
||||
from json import JSONDecodeError, JSONEncoder
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Generic, TypeVar
|
||||
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
|
||||
from homeassistant.const import (
|
||||
EVENT_HOMEASSISTANT_FINAL_WRITE,
|
||||
EVENT_HOMEASSISTANT_STARTED,
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
)
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
DOMAIN as HOMEASSISTANT_DOMAIN,
|
||||
@@ -43,7 +48,9 @@ STORAGE_DIR = ".storage"
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
STORAGE_SEMAPHORE = "storage_semaphore"
|
||||
STORAGE_MANAGER = "storage_manager"
|
||||
|
||||
MANAGER_CLEANUP_DELAY = 60
|
||||
|
||||
_T = TypeVar("_T", bound=Mapping[str, Any] | Sequence[Any])
|
||||
|
||||
@@ -88,6 +95,147 @@ async def async_migrator(
|
||||
return config
|
||||
|
||||
|
||||
def get_internal_store_manager(
|
||||
hass: HomeAssistant, config_dir: str | None = None
|
||||
) -> _StoreManager:
|
||||
"""Get the store manager.
|
||||
|
||||
This function is not part of the API and should only be
|
||||
used in the Home Assistant core internals. It is not
|
||||
guaranteed to be stable.
|
||||
"""
|
||||
if STORAGE_MANAGER not in hass.data:
|
||||
manager = _StoreManager(hass, config_dir or hass.config.config_dir)
|
||||
hass.data[STORAGE_MANAGER] = manager
|
||||
return hass.data[STORAGE_MANAGER]
|
||||
|
||||
|
||||
class _StoreManager:
|
||||
"""Class to help storing data.
|
||||
|
||||
The store manager is used to cache and manage storage files.
|
||||
"""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, config_dir: str) -> None:
|
||||
"""Initialize storage manager class."""
|
||||
self._hass = hass
|
||||
self._invalidated: set[str] = set()
|
||||
self._files: set[str] | None = None
|
||||
self._data_preload: dict[str, json_util.JsonValueType] = {}
|
||||
self._storage_path: Path = Path(config_dir).joinpath(STORAGE_DIR)
|
||||
self._cancel_cleanup: asyncio.TimerHandle | None = None
|
||||
|
||||
async def async_initialize(self) -> None:
|
||||
"""Initialize the storage manager."""
|
||||
hass = self._hass
|
||||
await hass.async_add_executor_job(self._initialize_files)
|
||||
hass.bus.async_listen_once(
|
||||
EVENT_HOMEASSISTANT_STARTED,
|
||||
self._async_schedule_cleanup,
|
||||
run_immediately=True,
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_invalidate(self, key: str) -> None:
|
||||
"""Invalidate cache.
|
||||
|
||||
Store calls this when its going to save data
|
||||
to ensure that the cache is not used after that.
|
||||
"""
|
||||
if "/" not in key:
|
||||
self._invalidated.add(key)
|
||||
self._data_preload.pop(key, None)
|
||||
|
||||
@callback
|
||||
def async_fetch(
|
||||
self, key: str
|
||||
) -> tuple[bool, json_util.JsonValueType | None] | None:
|
||||
"""Fetch data from cache."""
|
||||
#
|
||||
# If the key is invalidated, we don't need to check the cache
|
||||
# If async_initialize has not been called yet, we don't know
|
||||
# if the file exists or not so its a cache miss
|
||||
#
|
||||
# It is very important that we check if self._files is None
|
||||
# because we do not want to incorrectly return a cache miss
|
||||
# because async_initialize has not been called yet as it would
|
||||
# cause the Store to return None when it should not.
|
||||
#
|
||||
# The "/" in key check is to prevent the cache from being used
|
||||
# for subdirs in case we have a key like "hacs/XXX"
|
||||
#
|
||||
if "/" in key or key in self._invalidated or self._files is None:
|
||||
_LOGGER.debug("%s: Cache miss", key)
|
||||
return None
|
||||
|
||||
# If async_initialize has been called and the key is not in self._files
|
||||
# then the file does not exist
|
||||
if key not in self._files:
|
||||
_LOGGER.debug("%s: Cache hit, does not exist", key)
|
||||
return (False, None)
|
||||
|
||||
# If the key is in the preload cache, return it
|
||||
if data := self._data_preload.pop(key, None):
|
||||
_LOGGER.debug("%s: Cache hit data", key)
|
||||
return (True, data)
|
||||
|
||||
_LOGGER.debug("%s: Cache miss, not preloaded", key)
|
||||
return None
|
||||
|
||||
@callback
|
||||
def _async_schedule_cleanup(self, _event: Event) -> None:
|
||||
"""Schedule the cleanup of old files."""
|
||||
self._cancel_cleanup = self._hass.loop.call_later(
|
||||
MANAGER_CLEANUP_DELAY, self._async_cleanup
|
||||
)
|
||||
# Handle the case where we stop in the first 60s
|
||||
self._hass.bus.async_listen_once(
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
self._async_cancel_and_cleanup,
|
||||
run_immediately=True,
|
||||
)
|
||||
|
||||
@callback
|
||||
def _async_cancel_and_cleanup(self, _event: Event) -> None:
|
||||
"""Cancel the cleanup of old files."""
|
||||
self._async_cleanup()
|
||||
if self._cancel_cleanup:
|
||||
self._cancel_cleanup.cancel()
|
||||
self._cancel_cleanup = None
|
||||
|
||||
@callback
|
||||
def _async_cleanup(self) -> None:
|
||||
"""Cleanup unused cache.
|
||||
|
||||
If nothing consumes the cache 60s after startup or when we
|
||||
stop Home Assistant, we'll clear the cache.
|
||||
"""
|
||||
self._data_preload.clear()
|
||||
|
||||
async def async_preload(self, keys: Iterable[str]) -> None:
|
||||
"""Cache the keys."""
|
||||
# If async_initialize has not been called yet, we can't preload
|
||||
if self._files is not None and (existing := self._files.intersection(keys)):
|
||||
await self._hass.async_add_executor_job(self._preload, existing)
|
||||
|
||||
def _preload(self, keys: Iterable[str]) -> None:
|
||||
"""Cache the keys."""
|
||||
storage_path = self._storage_path
|
||||
data_preload = self._data_preload
|
||||
for key in keys:
|
||||
storage_file: Path = storage_path.joinpath(key)
|
||||
try:
|
||||
if storage_file.is_file():
|
||||
data_preload[key] = json_util.load_json(storage_file)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
_LOGGER.debug("Error loading %s: %s", key, ex)
|
||||
|
||||
def _initialize_files(self) -> None:
|
||||
"""Initialize the cache."""
|
||||
if self._storage_path.exists():
|
||||
self._files = set(os.listdir(self._storage_path))
|
||||
|
||||
|
||||
@bind_hass
|
||||
class Store(Generic[_T]):
|
||||
"""Class to help storing data."""
|
||||
@@ -103,6 +251,7 @@ class Store(Generic[_T]):
|
||||
encoder: type[JSONEncoder] | None = None,
|
||||
minor_version: int = 1,
|
||||
read_only: bool = False,
|
||||
config_dir: str | None = None,
|
||||
) -> None:
|
||||
"""Initialize storage class."""
|
||||
self.version = version
|
||||
@@ -119,6 +268,7 @@ class Store(Generic[_T]):
|
||||
self._atomic_writes = atomic_writes
|
||||
self._read_only = read_only
|
||||
self._next_write_time = 0.0
|
||||
self._manager = get_internal_store_manager(hass, config_dir)
|
||||
|
||||
@cached_property
|
||||
def path(self):
|
||||
@@ -170,6 +320,10 @@ class Store(Generic[_T]):
|
||||
# We make a copy because code might assume it's safe to mutate loaded data
|
||||
# and we don't want that to mess with what we're trying to store.
|
||||
data = deepcopy(data)
|
||||
elif cache := self._manager.async_fetch(self.key):
|
||||
exists, data = cache
|
||||
if not exists:
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
data = await self.hass.async_add_executor_job(
|
||||
@@ -366,6 +520,7 @@ class Store(Generic[_T]):
|
||||
async def _async_handle_write_data(self, *_args):
|
||||
"""Handle writing the config."""
|
||||
async with self._write_lock:
|
||||
self._manager.async_invalidate(self.key)
|
||||
self._async_cleanup_delay_listener()
|
||||
self._async_cleanup_final_write_listener()
|
||||
|
||||
@@ -409,6 +564,7 @@ class Store(Generic[_T]):
|
||||
|
||||
async def async_remove(self) -> None:
|
||||
"""Remove all data."""
|
||||
self._manager.async_invalidate(self.key)
|
||||
self._async_cleanup_delay_listener()
|
||||
self._async_cleanup_final_write_listener()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user