1
0
mirror of https://github.com/home-assistant/core.git synced 2026-06-08 08:27:18 +01:00

Add disk space coordinator for Portainer (#165855)

This commit is contained in:
Erwin Douna
2026-05-07 20:05:29 +02:00
committed by GitHub
parent db25f1911e
commit 2c8d9c7207
5 changed files with 320 additions and 149 deletions
+32 -5
View File
@@ -20,10 +20,11 @@ import homeassistant.helpers.config_validation as cv
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.device_registry import DeviceEntry
import homeassistant.helpers.entity_registry as er
from homeassistant.helpers.start import async_at_started
from homeassistant.helpers.typing import ConfigType
from .const import API_MAX_RETRIES, DOMAIN
from .coordinator import PortainerCoordinator
from .coordinator import PortainerCoordinator, PortainerDockerDiskSpaceCoordinator
from .services import async_setup_services
_PLATFORMS: list[Platform] = [
@@ -43,19 +44,45 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: PortainerConfigEntry) -> bool:
"""Set up Portainer from a config entry."""
session = async_create_clientsession(
hass=hass, verify_ssl=entry.data[CONF_VERIFY_SSL]
)
client = Portainer(
api_url=entry.data[CONF_URL],
api_key=entry.data[CONF_API_TOKEN],
session=async_create_clientsession(
hass=hass, verify_ssl=entry.data[CONF_VERIFY_SSL]
),
request_timeout=30,
session=session,
request_timeout=10,
max_retries=API_MAX_RETRIES,
)
coordinator = PortainerCoordinator(hass, entry, client)
await coordinator.async_config_entry_first_refresh()
docker_system_df_client = Portainer(
api_url=entry.data[CONF_URL],
api_key=entry.data[CONF_API_TOKEN],
session=session,
request_timeout=60,
max_retries=API_MAX_RETRIES,
)
docker_disk_space_coordinator = PortainerDockerDiskSpaceCoordinator(
hass, entry, docker_system_df_client
)
coordinator.docker_disk_space = docker_disk_space_coordinator
async def _defer_docker_disk_space_refresh(_: HomeAssistant) -> None:
"""Defer the first refresh until Home Assistant has started."""
hass.async_create_task(
docker_disk_space_coordinator.async_refresh(),
"portainer_docker_disk_space_initial_refresh",
)
# On lower-end hardware, the DF endpoint can take long
# Do not block the setup, but defer the first refresh until HA is fully started
entry.async_on_unload(async_at_started(hass, _defer_docker_disk_space_refresh))
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, _PLATFORMS)
+168 -126
View File
@@ -1,5 +1,6 @@
"""Data Update Coordinator for Portainer."""
from abc import abstractmethod
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
@@ -38,6 +39,7 @@ type PortainerConfigEntry = ConfigEntry[PortainerCoordinator]
_LOGGER = logging.getLogger(__name__)
DEFAULT_SCAN_INTERVAL = timedelta(seconds=60)
DEFAULT_DF_SCAN_INTERVAL = timedelta(minutes=30)
@dataclass
@@ -50,7 +52,6 @@ class PortainerCoordinatorData:
containers: dict[str, PortainerContainerData]
docker_version: DockerVersion
docker_info: DockerInfo
docker_system_df: DockerSystemDF
stacks: dict[str, PortainerStackData]
volumes: dict[str, PortainerVolumeData]
@@ -80,10 +81,11 @@ class PortainerVolumeData:
volume: DockerVolume
class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorData]]):
"""Data Update Coordinator for Portainer."""
class PortainerBaseCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
"""Base coordinator for Portainer."""
config_entry: PortainerConfigEntry
_update_interval: timedelta
def __init__(
self,
@@ -91,13 +93,13 @@ class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorD
config_entry: PortainerConfigEntry,
portainer: Portainer,
) -> None:
"""Initialize the Portainer Data Update Coordinator."""
"""Initialize."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=DEFAULT_SCAN_INTERVAL,
update_interval=self._update_interval,
)
self.portainer = portainer
@@ -144,7 +146,44 @@ class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorD
translation_placeholders={"error": repr(err)},
) from err
async def _async_update_data(self) -> dict[int, PortainerCoordinatorData]:
@abstractmethod
async def update_data(self) -> _DataT:
"""Update coordinator data."""
async def _async_update_data(self) -> _DataT:
"""Fetch per coordinator specific data."""
try:
return await self.update_data()
except PortainerAuthenticationError as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_auth",
translation_placeholders={"error": repr(err)},
) from err
except PortainerConnectionError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="cannot_connect",
translation_placeholders={"error": repr(err)},
) from err
except PortainerTimeoutError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="timeout_connect",
translation_placeholders={"error": repr(err)},
) from err
class PortainerCoordinator(
PortainerBaseCoordinator[dict[int, PortainerCoordinatorData]]
):
"""Data Update Coordinator for Portainer."""
config_entry: PortainerConfigEntry
docker_disk_space: PortainerDockerDiskSpaceCoordinator | None = None
_update_interval = DEFAULT_SCAN_INTERVAL
async def update_data(self) -> dict[int, PortainerCoordinatorData]:
"""Fetch data from Portainer API."""
_LOGGER.debug(
"Fetching data from Portainer API: %s", self.config_entry.data[CONF_URL]
@@ -176,134 +215,119 @@ class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorD
)
continue
try:
(
containers,
docker_version,
docker_info,
docker_system_df,
volumes,
) = await asyncio.gather(
self.portainer.get_containers(endpoint.id),
self.portainer.docker_version(endpoint.id),
self.portainer.docker_info(endpoint.id),
self.portainer.docker_system_df(endpoint.id, verbose=True),
self.portainer.get_volumes(endpoint.id),
(
containers,
docker_version,
docker_info,
docker_system_df,
volumes,
) = await asyncio.gather(
self.portainer.get_containers(endpoint.id),
self.portainer.docker_version(endpoint.id),
self.portainer.docker_info(endpoint.id),
self.portainer.docker_system_df(endpoint.id, verbose=True),
self.portainer.get_volumes(endpoint.id),
)
stack_requests = [self.portainer.get_stacks(endpoint_id=endpoint.id)]
swarm_id = (
docker_info.swarm.cluster.get("ID")
if docker_info.swarm
and docker_info.swarm.control_available
and docker_info.swarm.cluster
else None
)
if swarm_id:
stack_requests.append(
self.portainer.get_stacks(
endpoint_id=endpoint.id, swarm_id=swarm_id
)
)
stack_requests = [self.portainer.get_stacks(endpoint_id=endpoint.id)]
swarm_id = (
docker_info.swarm.cluster.get("ID")
if docker_info.swarm
and docker_info.swarm.control_available
and docker_info.swarm.cluster
stacks = [
stack
for result in await asyncio.gather(*stack_requests)
for stack in result
]
prev_endpoint = self.data.get(endpoint.id) if self.data else None
container_map: dict[str, PortainerContainerData] = {}
stack_map: dict[str, PortainerStackData] = {
stack.name: PortainerStackData(stack=stack, container_count=0)
for stack in stacks
}
# Map containers, started and stopped
for container in containers:
container_name = self._get_container_name(container.names[0])
prev_container = (
prev_endpoint.containers.get(container_name)
if prev_endpoint
else None
)
if swarm_id:
stack_requests.append(
self.portainer.get_stacks(
endpoint_id=endpoint.id, swarm_id=swarm_id
)
# Check if container belongs to a stack via docker compose label
stack_name: str | None = (
container.labels.get("com.docker.compose.project")
or container.labels.get("com.docker.stack.namespace")
if container.labels
else None
)
if stack_name and (stack_data := stack_map.get(stack_name)):
stack_data.container_count += 1
container_map[container_name] = PortainerContainerData(
container=container,
stats=None,
stats_pre=prev_container.stats if prev_container else None,
stack=stack_map[stack_name].stack
if stack_name and stack_name in stack_map
else None,
)
volume_usage_map = {
item["Name"]: item
for item in (docker_system_df.volume_disk_usage.items or [])
}
volume_map: dict[str, PortainerVolumeData] = {}
for volume in volumes:
if item := volume_usage_map.get(volume.name):
volume.usage_data = DockerVolumeUsageData(
size=item["UsageData"]["Size"],
ref_count=item["UsageData"]["RefCount"],
)
volume_map[volume.name] = PortainerVolumeData(volume=volume)
stacks = [
stack
for result in await asyncio.gather(*stack_requests)
for stack in result
]
prev_endpoint = self.data.get(endpoint.id) if self.data else None
container_map: dict[str, PortainerContainerData] = {}
stack_map: dict[str, PortainerStackData] = {
stack.name: PortainerStackData(stack=stack, container_count=0)
for stack in stacks
}
volume_usage_map = {
item["Name"]: item
for item in (docker_system_df.volume_disk_usage.items or [])
}
volume_map: dict[str, PortainerVolumeData] = {}
for volume in volumes:
if item := volume_usage_map.get(volume.name):
volume.usage_data = DockerVolumeUsageData(
size=item["UsageData"]["Size"],
ref_count=item["UsageData"]["RefCount"],
)
volume_map[volume.name] = PortainerVolumeData(volume=volume)
# Map containers, started and stopped
for container in containers:
container_name = self._get_container_name(container.names[0])
prev_container = (
prev_endpoint.containers.get(container_name)
if prev_endpoint
else None
)
# Check if container belongs to a stack via docker compose label
stack_name: str | None = (
container.labels.get("com.docker.compose.project")
or container.labels.get("com.docker.stack.namespace")
if container.labels
else None
)
if stack_name and (stack_data := stack_map.get(stack_name)):
stack_data.container_count += 1
container_map[container_name] = PortainerContainerData(
container=container,
stats=None,
stats_pre=prev_container.stats if prev_container else None,
stack=stack_map[stack_name].stack
if stack_name and stack_name in stack_map
else None,
)
# Separately fetch stats for active containers
active_containers = [
container
for container in containers
if container.state
in (DockerContainerState.RUNNING, DockerContainerState.PAUSED)
]
if active_containers:
container_stats = dict(
zip(
(
self._get_container_name(container.names[0])
for container in active_containers
),
await asyncio.gather(
*(
self.portainer.container_stats(
endpoint_id=endpoint.id,
container_id=container.id,
)
for container in active_containers
# Separately fetch stats for active containers
active_containers = [
container
for container in containers
if container.state
in (DockerContainerState.RUNNING, DockerContainerState.PAUSED)
]
if active_containers:
container_stats = dict(
zip(
(
self._get_container_name(container.names[0])
for container in active_containers
),
await asyncio.gather(
*(
self.portainer.container_stats(
endpoint_id=endpoint.id,
container_id=container.id,
)
),
strict=False,
)
for container in active_containers
)
),
strict=False,
)
)
# Now assign stats to the containers
for container_name, stats in container_stats.items():
container_map[container_name].stats = stats
except PortainerConnectionError as err:
_LOGGER.exception("Connection error")
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="cannot_connect",
translation_placeholders={"error": repr(err)},
) from err
except PortainerAuthenticationError as err:
_LOGGER.exception("Authentication error")
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_auth",
translation_placeholders={"error": repr(err)},
) from err
# Now assign stats to the containers
for container_name, stats in container_stats.items():
container_map[container_name].stats = stats
mapped_endpoints[endpoint.id] = PortainerCoordinatorData(
id=endpoint.id,
@@ -312,7 +336,6 @@ class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorD
containers=container_map,
docker_version=docker_version,
docker_info=docker_info,
docker_system_df=docker_system_df,
volumes=volume_map,
stacks=stack_map,
)
@@ -407,3 +430,22 @@ class PortainerCoordinator(DataUpdateCoordinator[dict[int, PortainerCoordinatorD
def _get_container_name(self, container_name: str) -> str:
"""Sanitize to get a proper container name."""
return container_name.replace("/", " ").strip()
class PortainerDockerDiskSpaceCoordinator(
PortainerBaseCoordinator[dict[int, DockerSystemDF]]
):
"""Data Update Coordinator for Docker disk space."""
config_entry: PortainerConfigEntry
_update_interval = DEFAULT_DF_SCAN_INTERVAL
async def update_data(self) -> dict[int, DockerSystemDF]:
"""Fetch Docker disk space data independently from Portainer API."""
endpoints = await self.portainer.get_endpoints()
results: dict[int, DockerSystemDF] = {}
for endpoint in endpoints:
if endpoint.status == EndpointStatus.DOWN:
continue
results[endpoint.id] = await self.portainer.docker_system_df(endpoint.id)
return results
@@ -13,6 +13,7 @@ from .coordinator import (
PortainerContainerData,
PortainerCoordinator,
PortainerCoordinatorData,
PortainerDockerDiskSpaceCoordinator,
PortainerStackData,
PortainerVolumeData,
)
@@ -24,6 +25,14 @@ class PortainerCoordinatorEntity(CoordinatorEntity[PortainerCoordinator]):
_attr_has_entity_name = True
class PortainerDockerDiskSpaceCoordinatorEntity(
CoordinatorEntity[PortainerDockerDiskSpaceCoordinator]
):
"""Base class for Portainer entities using the Docker disk space coordinator."""
_attr_has_entity_name = True
class PortainerEndpointEntity(PortainerCoordinatorEntity):
"""Base implementation for Portainer endpoint."""
@@ -177,6 +186,45 @@ class PortainerStackEntity(PortainerCoordinatorEntity):
return self.coordinator.data[self.endpoint_id].stacks[self.device_name]
class PortainerDockerSystemDiskSpaceEndpointEntity(
PortainerDockerDiskSpaceCoordinatorEntity
):
"""Base class for endpoint entities backed by the docker system disk space coordinator."""
def __init__(
self,
coordinator: PortainerDockerDiskSpaceCoordinator,
entity_description: EntityDescription,
device_info: PortainerCoordinatorData,
) -> None:
"""Initialize a Portainer docker system disk space endpoint entity."""
super().__init__(coordinator)
self.entity_description = entity_description
self.endpoint_id = device_info.endpoint.id
self._device_info = device_info
self._attr_device_info = DeviceInfo(
identifiers={
(DOMAIN, f"{coordinator.config_entry.entry_id}_{self.endpoint_id}")
},
configuration_url=URL(
f"{coordinator.config_entry.data[CONF_URL]}#!/{self.endpoint_id}/docker/dashboard"
),
manufacturer=DEFAULT_NAME,
model="Endpoint",
name=device_info.endpoint.name,
)
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{device_info.endpoint.id}_{entity_description.key}"
@property
def available(self) -> bool:
"""Return if the device is available."""
return (
super().available
and self.coordinator.data is not None
and self.endpoint_id in self.coordinator.data
)
class PortainerVolumeEntity(PortainerCoordinatorEntity):
"""Base implementation for Portainer volume."""
+58 -18
View File
@@ -2,8 +2,10 @@
from collections.abc import Callable
from dataclasses import dataclass
from itertools import chain
from pyportainer import StackType
from pyportainer.models.docker import DockerSystemDF
from homeassistant.components.sensor import (
EntityCategory,
@@ -26,6 +28,7 @@ from .coordinator import (
from .entity import (
PortainerContainerEntity,
PortainerCoordinatorData,
PortainerDockerSystemDiskSpaceEndpointEntity,
PortainerEndpointEntity,
PortainerStackEntity,
PortainerVolumeEntity,
@@ -55,6 +58,13 @@ class PortainerStackSensorEntityDescription(SensorEntityDescription):
value_fn: Callable[[PortainerStackData], StateType]
@dataclass(frozen=True, kw_only=True)
class PortainerDockerSystemDiskSpaceSensorEntityDescription(SensorEntityDescription):
"""Class to hold Portainer docker system disk space sensor description."""
value_fn: Callable[[DockerSystemDF], StateType]
@dataclass(frozen=True, kw_only=True)
class PortainerVolumeSensorEntityDescription(SensorEntityDescription):
"""Class to hold Portainer volume sensor description."""
@@ -146,6 +156,7 @@ CONTAINER_SENSORS: tuple[PortainerContainerSensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
),
)
ENDPOINT_SENSORS: tuple[PortainerEndpointSensorEntityDescription, ...] = (
PortainerEndpointSensorEntityDescription(
key="api_version",
@@ -243,50 +254,55 @@ ENDPOINT_SENSORS: tuple[PortainerEndpointSensorEntityDescription, ...] = (
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
),
PortainerEndpointSensorEntityDescription(
)
DOCKER_SYSTEM_DISK_SPACE_SENSORS: tuple[
PortainerDockerSystemDiskSpaceSensorEntityDescription, ...
] = (
PortainerDockerSystemDiskSpaceSensorEntityDescription(
key="container_disk_usage_reclaimable",
translation_key="container_disk_usage_reclaimable",
value_fn=lambda data: data.docker_system_df.container_disk_usage.reclaimable,
value_fn=lambda data: data.container_disk_usage.reclaimable,
device_class=SensorDeviceClass.DATA_SIZE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.BYTES,
suggested_unit_of_measurement=UnitOfInformation.MEBIBYTES,
entity_category=EntityCategory.DIAGNOSTIC,
),
PortainerEndpointSensorEntityDescription(
PortainerDockerSystemDiskSpaceSensorEntityDescription(
key="container_disk_usage_total_size",
translation_key="container_disk_usage_total_size",
value_fn=lambda data: data.docker_system_df.container_disk_usage.total_size,
value_fn=lambda data: data.container_disk_usage.total_size,
device_class=SensorDeviceClass.DATA_SIZE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.BYTES,
suggested_unit_of_measurement=UnitOfInformation.MEBIBYTES,
entity_category=EntityCategory.DIAGNOSTIC,
),
PortainerEndpointSensorEntityDescription(
PortainerDockerSystemDiskSpaceSensorEntityDescription(
key="image_disk_usage_reclaimable",
translation_key="image_disk_usage_reclaimable",
value_fn=lambda data: data.docker_system_df.image_disk_usage.reclaimable,
value_fn=lambda data: data.image_disk_usage.reclaimable,
device_class=SensorDeviceClass.DATA_SIZE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.BYTES,
suggested_unit_of_measurement=UnitOfInformation.MEBIBYTES,
entity_category=EntityCategory.DIAGNOSTIC,
),
PortainerEndpointSensorEntityDescription(
PortainerDockerSystemDiskSpaceSensorEntityDescription(
key="image_disk_usage_total_size",
translation_key="image_disk_usage_total_size",
value_fn=lambda data: data.docker_system_df.image_disk_usage.total_size,
value_fn=lambda data: data.image_disk_usage.total_size,
device_class=SensorDeviceClass.DATA_SIZE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.BYTES,
suggested_unit_of_measurement=UnitOfInformation.MEBIBYTES,
entity_category=EntityCategory.DIAGNOSTIC,
),
PortainerEndpointSensorEntityDescription(
PortainerDockerSystemDiskSpaceSensorEntityDescription(
key="volume_disk_usage_total",
translation_key="volume_disk_usage_total_size",
value_fn=lambda data: data.docker_system_df.volume_disk_usage.total_size,
value_fn=lambda data: data.volume_disk_usage.total_size,
device_class=SensorDeviceClass.DATA_SIZE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.BYTES,
@@ -347,18 +363,28 @@ async def async_setup_entry(
) -> None:
"""Set up Portainer sensors based on a config entry."""
coordinator = entry.runtime_data
ds_coordinator = coordinator.docker_disk_space
assert ds_coordinator is not None
def _async_add_new_endpoints(endpoints: list[PortainerCoordinatorData]) -> None:
"""Add new endpoint sensor."""
"""Add new endpoint sensors."""
async_add_entities(
PortainerEndpointSensor(
coordinator,
entity_description,
endpoint,
chain(
(
PortainerEndpointSensor(coordinator, entity_description, endpoint)
for entity_description in ENDPOINT_SENSORS
for endpoint in endpoints
),
(
PortainerDockerSystemDiskSpaceSensor(
ds_coordinator,
entity_description,
endpoint,
)
for entity_description in DOCKER_SYSTEM_DISK_SPACE_SENSORS
for endpoint in endpoints
),
)
for entity_description in ENDPOINT_SENSORS
for endpoint in endpoints
if entity_description.value_fn(endpoint)
)
def _async_add_new_containers(
@@ -464,6 +490,20 @@ class PortainerEndpointSensor(PortainerEndpointEntity, SensorEntity):
return self.entity_description.value_fn(endpoint_data)
class PortainerDockerSystemDiskSpaceSensor(
PortainerDockerSystemDiskSpaceEndpointEntity, SensorEntity
):
"""Representation of a Portainer docker system disk space sensor."""
entity_description: PortainerDockerSystemDiskSpaceSensorEntityDescription
@property
def native_value(self) -> StateType:
"""Return the state of the sensor."""
endpoint_data = self.coordinator.data[self._device_info.endpoint.id]
return self.entity_description.value_fn(endpoint_data)
class PortainerStackSensor(PortainerStackEntity, SensorEntity):
"""Representation of a Portainer stack sensor."""
+14
View File
@@ -21,6 +21,7 @@ from homeassistant.const import (
CONF_HOST,
CONF_URL,
CONF_VERIFY_SSL,
STATE_UNAVAILABLE,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
@@ -302,6 +303,19 @@ async def test_container_stack_device_links(
assert standalone_container_device.via_device_id == endpoint_device.id
async def test_docker_system_df_refresh_runs_on_ha_start(
hass: HomeAssistant,
mock_portainer_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test docker system df coordinator refreshes DF data on HA start."""
await setup_integration(hass, mock_config_entry)
state = hass.states.get("sensor.my_environment_image_disk_usage_total_size")
assert state is not None
assert state.state != STATE_UNAVAILABLE
async def test_new_endpoint_callback(
hass: HomeAssistant,
mock_portainer_client: AsyncMock,