1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-21 02:18:47 +00:00
Files
Andrej Friesen e0f39e6392 Add Pressure Stall Information (PSI) to Systemmonitor integration (#151946)
Co-authored-by: Franck Nijhof <git@frenck.dev>
Co-authored-by: Joostlek <joostlek@outlook.com>
2026-02-16 23:48:15 +01:00

172 lines
5.7 KiB
Python

"""Utils for System Monitor."""
import logging
import os
import re
from typing import Any
from psutil._common import sfan, shwtemp
import psutil_home_assistant as ha_psutil
from homeassistant.core import HomeAssistant
from .const import CPU_SENSOR_PREFIXES
_LOGGER = logging.getLogger(__name__)
SKIP_DISK_TYPES = {"proc", "tmpfs", "devtmpfs"}
def get_all_disk_mounts(
hass: HomeAssistant, psutil_wrapper: ha_psutil.PsutilWrapper
) -> set[str]:
"""Return all disk mount points on system."""
disks: set[str] = set()
for part in psutil_wrapper.psutil.disk_partitions(all=True):
if part.fstype in SKIP_DISK_TYPES:
# Ignore disks which are memory
continue
try:
if not os.path.isdir(part.mountpoint):
_LOGGER.debug(
"Mountpoint %s was excluded because it is not a directory",
part.mountpoint,
)
continue
usage = psutil_wrapper.psutil.disk_usage(part.mountpoint)
except PermissionError:
_LOGGER.debug(
"No permission for running user to access %s", part.mountpoint
)
continue
except OSError as err:
_LOGGER.debug(
"Mountpoint %s was excluded because of: %s", part.mountpoint, err
)
continue
if usage.total > 0 and part.device != "":
disks.add(part.mountpoint)
_LOGGER.debug("Adding disks: %s", ", ".join(disks))
return disks
def get_all_network_interfaces(
hass: HomeAssistant, psutil_wrapper: ha_psutil.PsutilWrapper
) -> set[str]:
"""Return all network interfaces on system."""
interfaces: set[str] = set()
for interface in psutil_wrapper.psutil.net_if_addrs():
if interface.startswith("veth"):
# Don't load docker virtual network interfaces
continue
interfaces.add(interface)
_LOGGER.debug("Adding interfaces: %s", ", ".join(interfaces))
return interfaces
def get_all_running_processes(hass: HomeAssistant) -> set[str]:
"""Return all running processes on system."""
psutil_wrapper = ha_psutil.PsutilWrapper()
processes: set[str] = set()
for proc in psutil_wrapper.psutil.process_iter(["name"]):
if proc.name() not in processes:
processes.add(proc.name())
_LOGGER.debug("Running processes: %s", ", ".join(processes))
return processes
def read_cpu_temperature(temps: dict[str, list[shwtemp]]) -> float | None:
"""Attempt to read CPU / processor temperature."""
entry: shwtemp
_LOGGER.debug("CPU Temperatures: %s", temps)
for name, entries in temps.items():
for i, entry in enumerate(entries, start=1):
# In case the label is empty (e.g. on Raspberry PI 4),
# construct it ourself here based on the sensor key name.
_label = f"{name} {i}" if not entry.label else entry.label
# check both name and label because some systems embed cpu# in the
# name, which makes label not match because label adds cpu# at end.
if _label in CPU_SENSOR_PREFIXES or name in CPU_SENSOR_PREFIXES:
return round(entry.current, 1)
return None
def read_fan_speed(fans: dict[str, list[sfan]]) -> dict[str, int]:
"""Attempt to read fan speed."""
entry: sfan
_LOGGER.debug("Fan speed: %s", fans)
if not fans:
return {}
sensor_fans: dict[str, int] = {}
for name, entries in fans.items():
for entry in entries:
_label = name if not entry.label else entry.label
sensor_fans[_label] = round(entry.current, 0)
return sensor_fans
def parse_pressure_file(file_path: str) -> dict[str, dict[str, float | int]] | None:
"""Parses a single /proc/pressure file (cpu, memory, or io).
Args:
file_path (str): The full path to the pressure file.
Returns:
dict: A dictionary containing the parsed pressure stall information,
or None if the file cannot be read or parsed.
"""
try:
with open(file_path, encoding="utf-8") as f:
content = f.read()
except OSError:
return None
data: dict[str, dict[str, float | int]] = {}
# The regex looks for 'some' and 'full' lines and captures the values.
# It accounts for floating point numbers and integer values.
# Example line: "some avg10=0.00 avg60=0.00 avg300=0.00 total=0"
pattern = re.compile(r"(some|full)\s+(.*)")
lines = content.strip().split("\n")
for line in lines:
match = pattern.match(line)
if match:
line_type, values_str = match.groups()
values: dict[str, float | int] = {}
for item in values_str.split():
try:
key, value = item.split("=")
# Convert values to float, except for 'total' which is an integer
if key == "total":
values[key] = int(value)
else:
values[key] = float(value)
except ValueError:
continue
data[line_type] = values
return data
def get_all_pressure_info() -> dict[str, Any]:
"""Parses all available pressure information from /proc/pressure/.
Returns:
dict: A dictionary containing cpu, memory, and io pressure info.
Returns an empty dictionary if no pressure files are found.
"""
pressure_info: dict[str, Any] = {}
resources = ["cpu", "memory", "io"]
for resource in resources:
file_path = f"/proc/pressure/{resource}"
parsed_data = parse_pressure_file(file_path)
if parsed_data:
pressure_info[resource] = parsed_data
return pressure_info