mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 21:06:19 +00:00
Add opt-in to Store for serializing in an executor (#157263)
This commit is contained in:
@@ -27,7 +27,6 @@ from homeassistant.core import (
|
||||
Event,
|
||||
HomeAssistant,
|
||||
callback,
|
||||
is_callback,
|
||||
)
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.loader import bind_hass
|
||||
@@ -242,8 +241,25 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
encoder: type[JSONEncoder] | None = None,
|
||||
minor_version: int = 1,
|
||||
read_only: bool = False,
|
||||
serialize_in_event_loop: bool = True,
|
||||
) -> None:
|
||||
"""Initialize storage class."""
|
||||
"""Initialize storage class.
|
||||
|
||||
Args:
|
||||
serialize_in_event_loop: Whether to serialize data in the event loop.
|
||||
Set to True (default) if data passed to async_save and data produced by
|
||||
data_func passed to async_delay_save needs to be serialized in the event
|
||||
loop because it is not thread safe.
|
||||
|
||||
Set to False if the data passed to async_save and data produced by
|
||||
data_func passed to async_delay_save can safely be accessed from a
|
||||
separate thread, i.e. the data is thread safe and not mutated by other
|
||||
code while serialization is in progress.
|
||||
|
||||
Users should support serializing in a separate thread for stores which
|
||||
are expected to store large amounts of data to avoid blocking the event
|
||||
loop during serialization.
|
||||
"""
|
||||
self.version = version
|
||||
self.minor_version = minor_version
|
||||
self.key = key
|
||||
@@ -259,6 +275,7 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
self._read_only = read_only
|
||||
self._next_write_time = 0.0
|
||||
self._manager = get_internal_store_manager(hass)
|
||||
self._serialize_in_event_loop = serialize_in_event_loop
|
||||
|
||||
@cached_property
|
||||
def path(self):
|
||||
@@ -444,9 +461,10 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
) -> None:
|
||||
"""Save data with an optional delay.
|
||||
|
||||
data_func: A function that returns the data to save. If the function
|
||||
is decorated with @callback, it will be called in the event loop. If
|
||||
it is a regular function, it will be called from an executor.
|
||||
data_func: A function that returns the data to save. If serialize_in_event_loop
|
||||
is True, it will be called from and the returned data will be serialized in the
|
||||
in the event loop. If serialize_in_event_loop is False, it will be called from
|
||||
and the returned data will be serialized by a separate thread.
|
||||
"""
|
||||
self._data = {
|
||||
"version": self.version,
|
||||
@@ -548,8 +566,9 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
_LOGGER.error("Error writing config for %s: %s", self.key, err)
|
||||
|
||||
async def _async_write_data(self, data: dict) -> None:
|
||||
if "data_func" in data and is_callback(data["data_func"]):
|
||||
data["data"] = data.pop("data_func")()
|
||||
if self._serialize_in_event_loop:
|
||||
if "data_func" in data:
|
||||
data["data"] = data.pop("data_func")()
|
||||
mode, json_data = json_helper.prepare_save_json(data, encoder=self._encoder)
|
||||
await self.hass.async_add_executor_job(
|
||||
self._write_prepared_data, mode, json_data
|
||||
|
||||
@@ -27,7 +27,7 @@ from homeassistant.core import (
|
||||
)
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import issue_registry as ir, storage
|
||||
from homeassistant.helpers.json import json_bytes
|
||||
from homeassistant.helpers.json import json_bytes, prepare_save_json
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.color import RGBColor
|
||||
|
||||
@@ -177,17 +177,41 @@ async def test_saving_with_delay_threading(tmp_path: Path) -> None:
|
||||
calls.append("callback")
|
||||
return MOCK_DATA2
|
||||
|
||||
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY)
|
||||
store.async_delay_save(data_producer_thread_safe, 1)
|
||||
def mock_prepare_thread_safe(*args, **kwargs):
|
||||
"""Mock prepare thread safe."""
|
||||
assert threading.get_ident() != hass.loop_thread_id
|
||||
return prepare_save_json(*args, **kwargs)
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
|
||||
await hass.async_block_till_done()
|
||||
def mock_prepare_not_thread_safe(*args, **kwargs):
|
||||
"""Mock prepare not thread safe."""
|
||||
assert threading.get_ident() == hass.loop_thread_id
|
||||
return prepare_save_json(*args, **kwargs)
|
||||
|
||||
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY2)
|
||||
store.async_delay_save(data_producer_callback, 1)
|
||||
with patch(
|
||||
"homeassistant.helpers.storage.json_helper.prepare_save_json",
|
||||
wraps=mock_prepare_thread_safe,
|
||||
) as mock_prepare:
|
||||
store = storage.Store(
|
||||
hass, MOCK_VERSION, MOCK_KEY, serialize_in_event_loop=False
|
||||
)
|
||||
store.async_delay_save(data_producer_thread_safe, 1)
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
|
||||
await hass.async_block_till_done()
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
mock_prepare.assert_called_once()
|
||||
|
||||
with patch(
|
||||
"homeassistant.helpers.storage.json_helper.prepare_save_json",
|
||||
wraps=mock_prepare_not_thread_safe,
|
||||
) as mock_prepare:
|
||||
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY2)
|
||||
store.async_delay_save(data_producer_callback, 1)
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
mock_prepare.assert_called_once()
|
||||
|
||||
assert calls == ["thread_safe", "callback"]
|
||||
expected_data = (
|
||||
@@ -216,6 +240,75 @@ async def test_saving_with_delay_threading(tmp_path: Path) -> None:
|
||||
await hass.async_stop(force=True)
|
||||
|
||||
|
||||
async def test_saving_with_threading(tmp_path: Path) -> None:
|
||||
"""Test thread handling when saving."""
|
||||
|
||||
async def assert_storage_data(store_key: str, expected_data: str) -> None:
|
||||
"""Assert storage data."""
|
||||
|
||||
def read_storage_data(store_key: str) -> str:
|
||||
"""Read storage data."""
|
||||
return Path(tmp_path / f".storage/{store_key}").read_text(encoding="utf-8")
|
||||
|
||||
store_data = await asyncio.to_thread(read_storage_data, store_key)
|
||||
assert store_data == expected_data
|
||||
|
||||
async with async_test_home_assistant(config_dir=tmp_path) as hass:
|
||||
|
||||
def mock_prepare_thread_safe(*args, **kwargs):
|
||||
"""Mock prepare thread safe."""
|
||||
assert threading.get_ident() != hass.loop_thread_id
|
||||
return prepare_save_json(*args, **kwargs)
|
||||
|
||||
def mock_prepare_not_thread_safe(*args, **kwargs):
|
||||
"""Mock prepare not thread safe."""
|
||||
assert threading.get_ident() == hass.loop_thread_id
|
||||
return prepare_save_json(*args, **kwargs)
|
||||
|
||||
with patch(
|
||||
"homeassistant.helpers.storage.json_helper.prepare_save_json",
|
||||
wraps=mock_prepare_thread_safe,
|
||||
) as mock_prepare:
|
||||
store = storage.Store(
|
||||
hass, MOCK_VERSION, MOCK_KEY, serialize_in_event_loop=False
|
||||
)
|
||||
await store.async_save(MOCK_DATA)
|
||||
mock_prepare.assert_called_once()
|
||||
|
||||
with patch(
|
||||
"homeassistant.helpers.storage.json_helper.prepare_save_json",
|
||||
wraps=mock_prepare_not_thread_safe,
|
||||
) as mock_prepare:
|
||||
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY2)
|
||||
await store.async_save(MOCK_DATA2)
|
||||
mock_prepare.assert_called_once()
|
||||
|
||||
expected_data = (
|
||||
"{\n"
|
||||
' "version": 1,\n'
|
||||
' "minor_version": 1,\n'
|
||||
' "key": "storage-test",\n'
|
||||
' "data": {\n'
|
||||
' "hello": "world"\n'
|
||||
" }\n"
|
||||
"}"
|
||||
)
|
||||
await assert_storage_data(MOCK_KEY, expected_data)
|
||||
expected_data = (
|
||||
"{\n"
|
||||
' "version": 1,\n'
|
||||
' "minor_version": 1,\n'
|
||||
' "key": "storage-test-2",\n'
|
||||
' "data": {\n'
|
||||
' "goodbye": "cruel world"\n'
|
||||
" }\n"
|
||||
"}"
|
||||
)
|
||||
await assert_storage_data(MOCK_KEY2, expected_data)
|
||||
|
||||
await hass.async_stop(force=True)
|
||||
|
||||
|
||||
async def test_saving_with_delay_churn_reduction(
|
||||
hass: HomeAssistant,
|
||||
store: storage.Store,
|
||||
|
||||
Reference in New Issue
Block a user