1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2026-04-02 08:12:47 +01:00
Files
supervisor/supervisor/docker/interface.py
Stefan Agner a4a17a70a5 Add specific error message for registry authentication failures (#6678)
* Add specific error message for registry authentication failures

When a Docker image pull fails with 401 Unauthorized and registry
credentials are configured, raise DockerRegistryAuthError instead of
a generic DockerError. This surfaces a clear message to the user
("Docker registry authentication failed for <registry>. Check your
registry credentials") instead of "An unknown error occurred with
addon <name>".

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add tests for registry authentication error handling

Test that a 401 during image pull raises DockerRegistryAuthError when
credentials are configured, and falls back to generic DockerError
when no credentials are present.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add tests for addon install/update/rebuild auth failure handling

Test that DockerRegistryAuthError propagates correctly through
addon install, update, and rebuild paths without being wrapped
in a generic AddonUnknownError.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 09:29:49 +02:00

642 lines
23 KiB
Python

"""Interface class for Supervisor Docker object."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Awaitable
from contextlib import suppress
from http import HTTPStatus
import logging
from time import time
from typing import Any
from uuid import uuid4
import aiodocker
import aiohttp
from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy
from ..const import (
ATTR_PASSWORD,
ATTR_REGISTRY,
ATTR_USERNAME,
LABEL_ARCH,
LABEL_VERSION,
BusEvent,
CpuArch,
)
from ..coresys import CoreSys
from ..exceptions import (
DockerAPIError,
DockerError,
DockerHubRateLimitExceeded,
DockerJobError,
DockerNotFound,
DockerRegistryAuthError,
)
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.sentry import async_capture_exception
from .const import DOCKER_HUB, DOCKER_HUB_LEGACY, ContainerState, RestartPolicy
from .manager import CommandReturn, ExecReturn, PullLogEntry
from .monitor import DockerContainerStateEvent
from .pull_progress import ImagePullProgress
from .stats import DockerStats
_LOGGER: logging.Logger = logging.getLogger(__name__)
MAP_ARCH: dict[CpuArch, str] = {
CpuArch.AARCH64: "linux/arm64",
CpuArch.AMD64: "linux/amd64",
}
def _restart_policy_from_model(meta_host: dict[str, Any]) -> RestartPolicy | None:
"""Get restart policy from host config model."""
if "RestartPolicy" not in meta_host:
return None
name = meta_host["RestartPolicy"].get("Name")
if not name:
return RestartPolicy.NO
if name in RestartPolicy:
return RestartPolicy(name)
_LOGGER.warning("Unknown Docker restart policy '%s', treating as no", name)
return RestartPolicy.NO
def _container_state_from_model(container_metadata: dict[str, Any]) -> ContainerState:
"""Get container state from model."""
if "State" not in container_metadata:
return ContainerState.UNKNOWN
if container_metadata["State"]["Status"] == "running":
if "Health" in container_metadata["State"]:
return (
ContainerState.HEALTHY
if container_metadata["State"]["Health"]["Status"] == "healthy"
else ContainerState.UNHEALTHY
)
return ContainerState.RUNNING
if container_metadata["State"]["ExitCode"] > 0:
return ContainerState.FAILED
return ContainerState.STOPPED
class DockerInterface(JobGroup, ABC):
"""Docker Supervisor interface."""
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
super().__init__(
coresys,
JOB_GROUP_DOCKER_INTERFACE.format_map(
defaultdict(str, name=self.name or uuid4().hex)
),
self.name,
)
self.coresys: CoreSys = coresys
self._meta: dict[str, Any] | None = None
@property
def timeout(self) -> int:
"""Return timeout for Docker actions."""
return 10
@property
@abstractmethod
def name(self) -> str:
"""Return name of Docker container."""
@property
def meta_config(self) -> dict[str, Any]:
"""Return meta data of configuration for container/image."""
if not self._meta:
return {}
return self._meta.get("Config", {})
@property
def meta_host(self) -> dict[str, Any]:
"""Return meta data of configuration for host."""
if not self._meta:
return {}
return self._meta.get("HostConfig", {})
@property
def meta_labels(self) -> dict[str, str]:
"""Return meta data of labels for container/image."""
return self.meta_config.get("Labels") or {}
@property
def meta_mounts(self) -> list[dict[str, Any]]:
"""Return meta data of mounts for container/image."""
if not self._meta:
return []
return self._meta.get("Mounts", [])
@property
def image(self) -> str | None:
"""Return name of Docker image."""
try:
return self.meta_config["Image"].partition(":")[0]
except KeyError:
return None
@property
def version(self) -> AwesomeVersion | None:
"""Return version of Docker image."""
if LABEL_VERSION not in self.meta_labels:
return None
return AwesomeVersion(self.meta_labels[LABEL_VERSION])
@property
def arch(self) -> str | None:
"""Return arch of Docker image."""
return self.meta_labels.get(LABEL_ARCH)
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.active_job is not None
@property
def restart_policy(self) -> RestartPolicy | None:
"""Return restart policy of container."""
return _restart_policy_from_model(self.meta_host)
@property
def security_opt(self) -> list[str]:
"""Control security options."""
# Disable Seccomp / We don't support it official and it
# causes problems on some types of host systems.
return ["seccomp=unconfined"]
@property
def healthcheck(self) -> dict[str, Any] | None:
"""Healthcheck of instance if it has one."""
return self.meta_config.get("Healthcheck")
def _get_credentials(self, image: str) -> tuple[dict, str]:
"""Return credentials for docker login and the qualified image name.
Returns a tuple of (credentials_dict, qualified_image) where the image
is prefixed with the registry when needed. This ensures aiodocker sets
the correct ServerAddress in the X-Registry-Auth header, which Docker's
containerd image store requires to match the actual registry host.
"""
credentials = {}
registry = self.sys_docker.config.get_registry_for_image(image)
qualified_image = image
if registry:
stored = self.sys_docker.config.registries[registry]
credentials[ATTR_USERNAME] = stored[ATTR_USERNAME]
credentials[ATTR_PASSWORD] = stored[ATTR_PASSWORD]
credentials[ATTR_REGISTRY] = registry
# For Docker Hub images, the image name typically lacks a registry
# prefix (e.g. "homeassistant/foo" instead of "docker.io/homeassistant/foo").
# aiodocker derives ServerAddress from image.partition("/"), so without
# the prefix it would use the namespace ("homeassistant") as ServerAddress,
# which Docker's containerd resolver rejects as a host mismatch.
if registry in (DOCKER_HUB, DOCKER_HUB_LEGACY):
qualified_image = f"{DOCKER_HUB}/{image}"
_LOGGER.debug(
"Logging in to %s as %s",
registry,
stored[ATTR_USERNAME],
)
return credentials, qualified_image
@Job(
name="docker_interface_install",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
internal=True,
)
async def install(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
arch: CpuArch | None = None,
) -> None:
"""Pull docker image."""
image = image or self.image
if not image:
raise ValueError("Cannot pull without an image!")
image_arch = arch or self.sys_arch.supervisor
platform = MAP_ARCH[image_arch]
pull_progress = ImagePullProgress()
current_job = self.sys_jobs.current
# Try to fetch manifest for accurate size-based progress
# This is optional - if it fails, we fall back to count-based progress
try:
manifest = await self.sys_docker.manifest_fetcher.get_manifest(
image, str(version), platform=platform
)
if manifest:
pull_progress.set_manifest(manifest)
_LOGGER.debug(
"Using manifest for progress: %d layers, %d bytes",
manifest.layer_count,
manifest.total_size,
)
except (aiohttp.ClientError, TimeoutError) as err:
_LOGGER.warning("Could not fetch manifest for progress: %s", err)
async def process_pull_event(event: PullLogEntry) -> None:
"""Process pull event and update job progress."""
if event.job_id != current_job.uuid:
return
try:
# Process event through progress tracker
pull_progress.process_event(event)
# Update job if progress changed significantly (>= 1%)
should_update, progress = pull_progress.should_update_job()
if should_update:
stage = pull_progress.get_stage()
current_job.update(progress=progress, stage=stage)
except ValueError as err:
# Catch ValueError from progress tracking (e.g. "Cannot update a job
# that is done") which can occur under rare event combinations.
# Log with context and send to Sentry. Continue the pull anyway as
# progress updates are informational only.
_LOGGER.warning(
"Received an unprocessable update for pull progress (layer: %s, status: %s, progress: %s): %s",
event.id,
event.status,
event.progress,
err,
)
await async_capture_exception(err)
except Exception as err: # pylint: disable=broad-except
# Catch any other unexpected errors in progress tracking to prevent
# pull from failing. Progress updates are informational - the pull
# itself should continue. Send to Sentry for debugging.
_LOGGER.warning(
"Error updating pull progress (layer: %s, status: %s): %s",
event.id,
event.status,
err,
)
await async_capture_exception(err)
listener = self.sys_bus.register_event(
BusEvent.DOCKER_IMAGE_PULL_UPDATE, process_pull_event
)
_LOGGER.info("Downloading docker image %s with tag %s.", image, version)
try:
# Get credentials for private registries to pass to aiodocker
credentials, pull_image_name = self._get_credentials(image)
# Pull new image, passing credentials to aiodocker
docker_image = await self.sys_docker.pull_image(
current_job.uuid,
pull_image_name,
str(version),
platform=platform,
auth=credentials or None,
)
# Tag latest
if latest:
_LOGGER.info(
"Tagging image %s with version %s as latest", image, version
)
await self.sys_docker.images.tag(
docker_image["Id"], image, tag="latest"
)
except aiodocker.DockerError as err:
if err.status == HTTPStatus.TOO_MANY_REQUESTS:
self.sys_resolution.create_issue(
IssueType.DOCKER_RATELIMIT,
ContextType.SYSTEM,
suggestions=[SuggestionType.REGISTRY_LOGIN],
)
raise DockerHubRateLimitExceeded(_LOGGER.error) from err
if err.status == HTTPStatus.UNAUTHORIZED and credentials:
raise DockerRegistryAuthError(
_LOGGER.error, registry=credentials[ATTR_REGISTRY]
) from err
await async_capture_exception(err)
raise DockerError(
f"Can't install {image}:{version!s}: {err}", _LOGGER.error
) from err
finally:
self.sys_bus.remove_listener(listener)
self._meta = docker_image
async def exists(self) -> bool:
"""Return True if Docker image exists in local repository."""
with suppress(aiodocker.DockerError):
await self.sys_docker.images.inspect(f"{self.image}:{self.version!s}")
return True
return False
async def _get_container(self) -> dict[str, Any] | None:
"""Get docker container, returns None if not found."""
try:
container = await self.sys_docker.containers.get(self.name)
return await container.show()
except aiodocker.DockerError as err:
if err.status == HTTPStatus.NOT_FOUND:
return None
raise DockerAPIError(
f"Docker API error occurred while getting container information: {err!s}"
) from err
async def is_running(self) -> bool:
"""Return True if Docker is running."""
return bool(
(container_metadata := await self._get_container())
and "State" in container_metadata
and container_metadata["State"]["Running"]
)
async def current_state(self) -> ContainerState:
"""Return current state of container."""
if container_metadata := await self._get_container():
return _container_state_from_model(container_metadata)
return ContainerState.UNKNOWN
@Job(name="docker_interface_attach", concurrency=JobConcurrency.GROUP_QUEUE)
async def attach(
self, version: AwesomeVersion, *, skip_state_event_if_down: bool = False
) -> None:
"""Attach to running Docker container."""
with suppress(aiodocker.DockerError):
docker_container = await self.sys_docker.containers.get(self.name)
self._meta = await docker_container.show()
self.sys_docker.monitor.watch_container(self._meta)
state = _container_state_from_model(self._meta)
if not (
skip_state_event_if_down
and state in [ContainerState.STOPPED, ContainerState.FAILED]
):
# Fire event with current state of container
self.sys_bus.fire_event(
BusEvent.DOCKER_CONTAINER_STATE_CHANGE,
DockerContainerStateEvent(
self.name, state, docker_container.id, int(time())
),
)
with suppress(aiodocker.DockerError):
if not self._meta and self.image:
self._meta = await self.sys_docker.images.inspect(
f"{self.image}:{version!s}"
)
# Successful?
if not self._meta:
raise DockerError(
f"Could not get metadata on container or image for {self.name}"
)
_LOGGER.info("Attaching to %s with version %s", self.image, self.version)
@Job(
name="docker_interface_run",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def run(self) -> None:
"""Run Docker image."""
raise NotImplementedError()
async def _run(self, *, name: str, **kwargs) -> None:
"""Run Docker image with retry if necessary."""
if not (image := self.image):
raise ValueError(f"Cannot determine image to use to run {self.name}!")
if await self.is_running():
return
# Cleanup
await self.stop()
# Create & Run container
try:
container_metadata = await self.sys_docker.run(image, name=name, **kwargs)
except DockerNotFound as err:
# If image is missing, capture the exception as this shouldn't happen
await async_capture_exception(err)
raise
# Store metadata
self._meta = container_metadata
@Job(
name="docker_interface_stop",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
with suppress(DockerNotFound):
await self.sys_docker.stop_container(
self.name, self.timeout, remove_container
)
@Job(
name="docker_interface_start",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def start(self) -> Awaitable[None]:
"""Start Docker container."""
return self.sys_docker.start_container(self.name)
@Job(
name="docker_interface_remove",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def remove(self, *, remove_image: bool = True) -> None:
"""Remove Docker images."""
if not self.image or not self.version:
raise DockerError(
"Cannot determine image and/or version from metadata!", _LOGGER.error
)
# Cleanup container
with suppress(DockerError):
await self.stop()
if remove_image:
await self.sys_docker.remove_image(self.image, self.version)
self._meta = None
@Job(
name="docker_interface_check_image",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_image(
self,
version: AwesomeVersion,
expected_image: str,
expected_cpu_arch: CpuArch | None = None,
) -> None:
"""Check we have expected image with correct arch."""
arch = expected_cpu_arch or self.sys_arch.supervisor
image_name = f"{expected_image}:{version!s}"
if self.image == expected_image:
try:
image = await self.sys_docker.images.inspect(image_name)
except aiodocker.DockerError as err:
raise DockerError(
f"Could not get {image_name} for check due to: {err!s}",
_LOGGER.error,
) from err
image_arch = f"{image['Os']}/{image['Architecture']}"
if "Variant" in image:
image_arch = f"{image_arch}/{image['Variant']}"
# If we have an image and its the right arch, all set
# It seems that newer Docker version return a variant for arm64 images.
# Make sure we match linux/arm64 and linux/arm64/v8.
expected_image_arch = MAP_ARCH[arch]
if image_arch.startswith(expected_image_arch):
return
_LOGGER.info(
"Image %s has arch %s, expected %s. Reinstalling.",
image_name,
image_arch,
expected_image_arch,
)
# We're missing the image we need. Stop and clean up what we have then pull the right one
with suppress(DockerError):
await self.remove()
await self.install(version, expected_image, arch=arch)
@Job(
name="docker_interface_update",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
) -> None:
"""Update a Docker image."""
image = image or self.image
_LOGGER.info(
"Updating image %s:%s to %s:%s", self.image, self.version, image, version
)
# Update docker image
await self.install(version, image=image, latest=latest)
# Stop container & cleanup
with suppress(DockerError):
await self.stop()
async def logs(self) -> list[str]:
"""Return Docker logs of container."""
with suppress(DockerError):
return await self.sys_docker.container_logs(self.name)
return []
@Job(name="docker_interface_cleanup", concurrency=JobConcurrency.GROUP_QUEUE)
async def cleanup(
self,
old_image: str | None = None,
image: str | None = None,
version: AwesomeVersion | None = None,
) -> None:
"""Check if old version exists and cleanup."""
if not (use_image := image or self.image):
raise DockerError("Cannot determine image from metadata!", _LOGGER.error)
if not (use_version := version or self.version):
raise DockerError("Cannot determine version from metadata!", _LOGGER.error)
await self.sys_docker.cleanup_old_images(
use_image, use_version, {old_image} if old_image else None
)
@Job(
name="docker_interface_restart",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def restart(self) -> Awaitable[None]:
"""Restart docker container."""
return self.sys_docker.restart_container(self.name, self.timeout)
@Job(
name="docker_interface_execute_command",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def execute_command(self, command: str) -> CommandReturn:
"""Create a temporary container and run command."""
raise NotImplementedError()
async def stats(self) -> DockerStats:
"""Read and return stats from container."""
stats = await self.sys_docker.container_stats(self.name)
return DockerStats(stats)
async def is_failed(self) -> bool:
"""Return True if Docker is failing state."""
return await self.current_state() == ContainerState.FAILED
async def get_latest_version(self) -> AwesomeVersion:
"""Return latest version of local image."""
available_version: list[AwesomeVersion] = []
try:
for image in await self.sys_docker.images.list(
filters=f'{{"reference": ["{self.image}"]}}'
):
for tag in image["RepoTags"]:
version = AwesomeVersion(tag.partition(":")[2])
if version.strategy == AwesomeVersionStrategy.UNKNOWN:
continue
available_version.append(version)
if not available_version:
raise ValueError()
except (aiodocker.DockerError, ValueError) as err:
raise DockerNotFound(
f"No version found for {self.image}", _LOGGER.info
) from err
_LOGGER.info("Found %s versions: %s", self.image, available_version)
# Sort version and return latest version
available_version.sort(reverse=True)
return available_version[0]
@Job(
name="docker_interface_run_inside",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def run_inside(self, command: str) -> Awaitable[ExecReturn]:
"""Execute a command inside Docker container."""
return self.sys_docker.container_run_inside(self.name, command)