mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 21:06:19 +00:00
Merge action and condition traces (#47373)
* Merge action and condition traces * Update __init__.py * Add typing to AutomationTrace * Make trace_get prepare a new trace by default * Correct typing of trace_cv * Fix tests
This commit is contained in:
@@ -1,33 +1,13 @@
|
||||
"""Helpers for script and condition tracing."""
|
||||
from collections import deque
|
||||
from contextlib import contextmanager
|
||||
from contextvars import ContextVar
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Deque, Dict, Generator, List, Optional, Union, cast
|
||||
|
||||
from homeassistant.helpers.typing import TemplateVarsType
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
|
||||
def trace_stack_push(trace_stack_var: ContextVar, node: Any) -> None:
|
||||
"""Push an element to the top of a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
if trace_stack is None:
|
||||
trace_stack = []
|
||||
trace_stack_var.set(trace_stack)
|
||||
trace_stack.append(node)
|
||||
|
||||
|
||||
def trace_stack_pop(trace_stack_var: ContextVar) -> None:
|
||||
"""Remove the top element from a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
trace_stack.pop()
|
||||
|
||||
|
||||
def trace_stack_top(trace_stack_var: ContextVar) -> Optional[Any]:
|
||||
"""Return the element at the top of a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
return trace_stack[-1] if trace_stack else None
|
||||
|
||||
|
||||
class TraceElement:
|
||||
"""Container for trace data."""
|
||||
|
||||
@@ -62,17 +42,105 @@ class TraceElement:
|
||||
return result
|
||||
|
||||
|
||||
# Context variables for tracing
|
||||
# Current trace
|
||||
trace_cv: ContextVar[Optional[Dict[str, Deque[TraceElement]]]] = ContextVar(
|
||||
"trace_cv", default=None
|
||||
)
|
||||
# Stack of TraceElements
|
||||
trace_stack_cv: ContextVar[Optional[List[TraceElement]]] = ContextVar(
|
||||
"trace_stack_cv", default=None
|
||||
)
|
||||
# Current location in config tree
|
||||
trace_path_stack_cv: ContextVar[Optional[List[str]]] = ContextVar(
|
||||
"trace_path_stack_cv", default=None
|
||||
)
|
||||
|
||||
|
||||
def trace_stack_push(trace_stack_var: ContextVar, node: Any) -> None:
|
||||
"""Push an element to the top of a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
if trace_stack is None:
|
||||
trace_stack = []
|
||||
trace_stack_var.set(trace_stack)
|
||||
trace_stack.append(node)
|
||||
|
||||
|
||||
def trace_stack_pop(trace_stack_var: ContextVar) -> None:
|
||||
"""Remove the top element from a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
trace_stack.pop()
|
||||
|
||||
|
||||
def trace_stack_top(trace_stack_var: ContextVar) -> Optional[Any]:
|
||||
"""Return the element at the top of a trace stack."""
|
||||
trace_stack = trace_stack_var.get()
|
||||
return trace_stack[-1] if trace_stack else None
|
||||
|
||||
|
||||
def trace_path_push(suffix: Union[str, List[str]]) -> int:
|
||||
"""Go deeper in the config tree."""
|
||||
if isinstance(suffix, str):
|
||||
suffix = [suffix]
|
||||
for node in suffix:
|
||||
trace_stack_push(trace_path_stack_cv, node)
|
||||
return len(suffix)
|
||||
|
||||
|
||||
def trace_path_pop(count: int) -> None:
|
||||
"""Go n levels up in the config tree."""
|
||||
for _ in range(count):
|
||||
trace_stack_pop(trace_path_stack_cv)
|
||||
|
||||
|
||||
def trace_path_get() -> str:
|
||||
"""Return a string representing the current location in the config tree."""
|
||||
path = trace_path_stack_cv.get()
|
||||
if not path:
|
||||
return ""
|
||||
return "/".join(path)
|
||||
|
||||
|
||||
def trace_append_element(
|
||||
trace_var: ContextVar,
|
||||
trace_element: TraceElement,
|
||||
path: str,
|
||||
maxlen: Optional[int] = None,
|
||||
) -> None:
|
||||
"""Append a TraceElement to trace[path]."""
|
||||
trace = trace_var.get()
|
||||
trace = trace_cv.get()
|
||||
if trace is None:
|
||||
trace_var.set({})
|
||||
trace = trace_var.get()
|
||||
trace = {}
|
||||
trace_cv.set(trace)
|
||||
if path not in trace:
|
||||
trace[path] = deque(maxlen=maxlen)
|
||||
trace[path].append(trace_element)
|
||||
|
||||
|
||||
def trace_get(clear: bool = True) -> Optional[Dict[str, Deque[TraceElement]]]:
|
||||
"""Return the current trace."""
|
||||
if clear:
|
||||
trace_clear()
|
||||
return trace_cv.get()
|
||||
|
||||
|
||||
def trace_clear() -> None:
|
||||
"""Clear the trace."""
|
||||
trace_cv.set({})
|
||||
trace_stack_cv.set(None)
|
||||
trace_path_stack_cv.set(None)
|
||||
|
||||
|
||||
def trace_set_result(**kwargs: Any) -> None:
|
||||
"""Set the result of TraceElement at the top of the stack."""
|
||||
node = cast(TraceElement, trace_stack_top(trace_stack_cv))
|
||||
node.set_result(**kwargs)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def trace_path(suffix: Union[str, List[str]]) -> Generator:
|
||||
"""Go deeper in the config tree."""
|
||||
count = trace_path_push(suffix)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
trace_path_pop(count)
|
||||
|
||||
Reference in New Issue
Block a user