From a4f681586e4254ab7ea09084dd1a8600420d9efc Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 1 Dec 2025 14:07:21 +0100 Subject: [PATCH] Use count-based progress for Docker image pulls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor Docker image pull progress to use a simpler count-based approach where each layer contributes equally (100% / total_layers) regardless of size. This replaces the previous size-weighted calculation that was susceptible to progress regression. The core issue was that Docker rate-limits concurrent downloads (~3 at a time) and reports layer sizes only when downloading starts. With size- weighted progress, large layers appearing late would cause progress to drop dramatically (e.g., 59% -> 29%) as the total size increased. The new approach: - Each layer contributes equally to overall progress - Per-layer progress: 70% download weight, 30% extraction weight - Progress only starts after first "Downloading" event (when layer count is known) - Always caps at 99% - job completion handles final 100% This simplifies the code by moving progress tracking to a dedicated module (pull_progress.py) and removing complex size-based scaling logic that tried to account for unknown layer sizes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- supervisor/docker/const.py | 58 +-- supervisor/docker/interface.py | 199 +------- supervisor/docker/pull_progress.py | 310 ++++++++++++ supervisor/exceptions.py | 4 - tests/docker/test_interface.py | 43 +- tests/docker/test_pull_progress.py | 788 +++++++++++++++++++++++++++++ 6 files changed, 1148 insertions(+), 254 deletions(-) create mode 100644 supervisor/docker/pull_progress.py create mode 100644 tests/docker/test_pull_progress.py diff --git a/supervisor/docker/const.py b/supervisor/docker/const.py index a13fbb22f..ac87ae94c 100644 --- a/supervisor/docker/const.py +++ b/supervisor/docker/const.py @@ -2,19 +2,14 @@ from __future__ import annotations -from contextlib import suppress -from enum import Enum, StrEnum -from functools import total_ordering +from enum import StrEnum from pathlib import PurePath import re -from typing import cast from docker.types import Mount from ..const import MACHINE_ID -RE_RETRYING_DOWNLOAD_STATUS = re.compile(r"Retrying in \d+ seconds?") - # Docker Hub registry identifier DOCKER_HUB = "hub.docker.com" @@ -81,57 +76,6 @@ class PropagationMode(StrEnum): RSLAVE = "rslave" -@total_ordering -class PullImageLayerStage(Enum): - """Job stages for pulling an image layer. - - These are a subset of the statuses in a docker pull image log. They - are the standardized ones that are the most useful to us. - """ - - PULLING_FS_LAYER = 1, "Pulling fs layer" - RETRYING_DOWNLOAD = 2, "Retrying download" - DOWNLOADING = 2, "Downloading" - VERIFYING_CHECKSUM = 3, "Verifying Checksum" - DOWNLOAD_COMPLETE = 4, "Download complete" - EXTRACTING = 5, "Extracting" - PULL_COMPLETE = 6, "Pull complete" - - def __init__(self, order: int, status: str) -> None: - """Set fields from values.""" - self.order = order - self.status = status - - def __eq__(self, value: object, /) -> bool: - """Check equality, allow StrEnum style comparisons on status.""" - with suppress(AttributeError): - return self.status == cast(PullImageLayerStage, value).status - return self.status == value - - def __lt__(self, other: object) -> bool: - """Order instances.""" - with suppress(AttributeError): - return self.order < cast(PullImageLayerStage, other).order - return False - - def __hash__(self) -> int: - """Hash instance.""" - return hash(self.status) - - @classmethod - def from_status(cls, status: str) -> PullImageLayerStage | None: - """Return stage instance from pull log status.""" - for i in cls: - if i.status == status: - return i - - # This one includes number of seconds until download so its not constant - if RE_RETRYING_DOWNLOAD_STATUS.match(status): - return cls.RETRYING_DOWNLOAD - - return None - - ENV_TIME = "TZ" ENV_TOKEN = "SUPERVISOR_TOKEN" ENV_TOKEN_OLD = "HASSIO_TOKEN" diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index 60eb9cd33..83bbd5bef 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -19,7 +19,6 @@ import docker from docker.models.containers import Container import requests -from ..bus import EventListener from ..const import ( ATTR_PASSWORD, ATTR_REGISTRY, @@ -35,19 +34,18 @@ from ..exceptions import ( DockerError, DockerHubRateLimitExceeded, DockerJobError, - DockerLogOutOfOrder, DockerNotFound, DockerRequestError, ) -from ..jobs import SupervisorJob from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency from ..jobs.decorator import Job from ..jobs.job_group import JobGroup from ..resolution.const import ContextType, IssueType, SuggestionType from ..utils.sentry import async_capture_exception -from .const import DOCKER_HUB, ContainerState, PullImageLayerStage, RestartPolicy +from .const import DOCKER_HUB, ContainerState, RestartPolicy from .manager import CommandReturn, PullLogEntry from .monitor import DockerContainerStateEvent +from .pull_progress import ImagePullProgress from .stats import DockerStats _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -195,159 +193,6 @@ class DockerInterface(JobGroup, ABC): return credentials - def _process_pull_image_log( # noqa: C901 - self, install_job_id: str, reference: PullLogEntry - ) -> None: - """Process events fired from a docker while pulling an image, filtered to a given job id.""" - if ( - reference.job_id != install_job_id - or not reference.id - or not reference.status - or not (stage := PullImageLayerStage.from_status(reference.status)) - ): - return - - # Pulling FS Layer is our marker for a layer that needs to be downloaded and extracted. Otherwise it already exists and we can ignore - job: SupervisorJob | None = None - if stage == PullImageLayerStage.PULLING_FS_LAYER: - job = self.sys_jobs.new_job( - name="Pulling container image layer", - initial_stage=stage.status, - reference=reference.id, - parent_id=install_job_id, - internal=True, - ) - job.done = False - return - - # Find our sub job to update details of - for j in self.sys_jobs.jobs: - if j.parent_id == install_job_id and j.reference == reference.id: - job = j - break - - # There should no longer be any real risk of logs out of order anymore. - # However tests with very small images have shown that sometimes Docker - # skips stages in log. So keeping this one as a safety check on null job - if not job: - raise DockerLogOutOfOrder( - f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {install_job_id} but could not find a matching job, skipping", - _LOGGER.debug, - ) - - # For progress calculation we assume downloading is 70% of time, extracting is 30% and others stages negligible - progress = job.progress - match stage: - case PullImageLayerStage.DOWNLOADING | PullImageLayerStage.EXTRACTING: - if ( - reference.progress_detail - and reference.progress_detail.current - and reference.progress_detail.total - ): - progress = ( - reference.progress_detail.current - / reference.progress_detail.total - ) - if stage == PullImageLayerStage.DOWNLOADING: - progress = 70 * progress - else: - progress = 70 + 30 * progress - case ( - PullImageLayerStage.VERIFYING_CHECKSUM - | PullImageLayerStage.DOWNLOAD_COMPLETE - ): - progress = 70 - case PullImageLayerStage.PULL_COMPLETE: - progress = 100 - case PullImageLayerStage.RETRYING_DOWNLOAD: - progress = 0 - - # No real risk of getting things out of order in current implementation - # but keeping this one in case another change to these trips us up. - if stage != PullImageLayerStage.RETRYING_DOWNLOAD and progress < job.progress: - raise DockerLogOutOfOrder( - f"Received pull image log with status {reference.status} for job {job.uuid} that implied progress was {progress} but current progress is {job.progress}, skipping", - _LOGGER.debug, - ) - - # Our filters have all passed. Time to update the job - # Only downloading and extracting have progress details. Use that to set extra - # We'll leave it around on later stages as the total bytes may be useful after that stage - # Enforce range to prevent float drift error - progress = max(0, min(progress, 100)) - if ( - stage in {PullImageLayerStage.DOWNLOADING, PullImageLayerStage.EXTRACTING} - and reference.progress_detail - and reference.progress_detail.current is not None - and reference.progress_detail.total is not None - ): - job.update( - progress=progress, - stage=stage.status, - extra={ - "current": reference.progress_detail.current, - "total": reference.progress_detail.total, - }, - ) - else: - # If we reach DOWNLOAD_COMPLETE without ever having set extra (small layers that skip - # the downloading phase), set a minimal extra so aggregate progress calculation can proceed - extra = job.extra - if stage == PullImageLayerStage.DOWNLOAD_COMPLETE and not job.extra: - extra = {"current": 1, "total": 1} - - job.update( - progress=progress, - stage=stage.status, - done=stage == PullImageLayerStage.PULL_COMPLETE, - extra=None if stage == PullImageLayerStage.RETRYING_DOWNLOAD else extra, - ) - - # Once we have received a progress update for every child job, start to set status of the main one - install_job = self.sys_jobs.get_job(install_job_id) - layer_jobs = [ - job - for job in self.sys_jobs.jobs - if job.parent_id == install_job.uuid - and job.name == "Pulling container image layer" - ] - - # First set the total bytes to be downloaded/extracted on the main job - if not install_job.extra: - total = 0 - for job in layer_jobs: - if not job.extra: - return - total += job.extra["total"] - install_job.extra = {"total": total} - else: - total = install_job.extra["total"] - - # Then determine total progress based on progress of each sub-job, factoring in size of each compared to total - progress = 0.0 - stage = PullImageLayerStage.PULL_COMPLETE - for job in layer_jobs: - if not job.extra or not job.extra.get("total"): - return - progress += job.progress * (job.extra["total"] / total) - job_stage = PullImageLayerStage.from_status(cast(str, job.stage)) - - if job_stage < PullImageLayerStage.EXTRACTING: - stage = PullImageLayerStage.DOWNLOADING - elif ( - stage == PullImageLayerStage.PULL_COMPLETE - and job_stage < PullImageLayerStage.PULL_COMPLETE - ): - stage = PullImageLayerStage.EXTRACTING - - # Ensure progress is 100 at this point to prevent float drift - if stage == PullImageLayerStage.PULL_COMPLETE: - progress = 100 - - # To reduce noise, limit updates to when result has changed by an entire percent or when stage changed - if stage != install_job.stage or progress >= install_job.progress + 1: - install_job.update(stage=stage.status, progress=max(0, min(progress, 100))) - @Job( name="docker_interface_install", on_condition=DockerJobError, @@ -367,30 +212,35 @@ class DockerInterface(JobGroup, ABC): raise ValueError("Cannot pull without an image!") image_arch = arch or self.sys_arch.supervisor - listener: EventListener | None = None + pull_progress = ImagePullProgress() + current_job = self.sys_jobs.current + + async def process_pull_event(event: PullLogEntry) -> None: + """Process pull event and update job progress.""" + if event.job_id != current_job.uuid: + return + + # Process event through progress tracker + pull_progress.process_event(event) + + # Update job if progress changed significantly (>= 1%) + should_update, progress = pull_progress.should_update_job() + if should_update: + stage = pull_progress.get_stage() + current_job.update(progress=progress, stage=stage) + + listener = self.sys_bus.register_event( + BusEvent.DOCKER_IMAGE_PULL_UPDATE, process_pull_event + ) _LOGGER.info("Downloading docker image %s with tag %s.", image, version) try: # Get credentials for private registries to pass to aiodocker credentials = self._get_credentials(image) or None - curr_job_id = self.sys_jobs.current.uuid - - async def process_pull_image_log(reference: PullLogEntry) -> None: - try: - self._process_pull_image_log(curr_job_id, reference) - except DockerLogOutOfOrder as err: - # Send all these to sentry. Missing a few progress updates - # shouldn't matter to users but matters to us - await async_capture_exception(err) - - listener = self.sys_bus.register_event( - BusEvent.DOCKER_IMAGE_PULL_UPDATE, process_pull_image_log - ) - # Pull new image, passing credentials to aiodocker docker_image = await self.sys_docker.pull_image( - self.sys_jobs.current.uuid, + current_job.uuid, image, str(version), platform=MAP_ARCH[image_arch], @@ -438,8 +288,7 @@ class DockerInterface(JobGroup, ABC): f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error ) from err finally: - if listener: - self.sys_bus.remove_listener(listener) + self.sys_bus.remove_listener(listener) self._meta = docker_image diff --git a/supervisor/docker/pull_progress.py b/supervisor/docker/pull_progress.py new file mode 100644 index 000000000..69e967ed8 --- /dev/null +++ b/supervisor/docker/pull_progress.py @@ -0,0 +1,310 @@ +"""Image pull progress tracking.""" + +from __future__ import annotations + +from contextlib import suppress +from dataclasses import dataclass, field +from enum import Enum +import logging +from typing import TYPE_CHECKING, cast + +if TYPE_CHECKING: + from .manager import PullLogEntry + +_LOGGER = logging.getLogger(__name__) + +# Progress weight distribution: 70% downloading, 30% extraction +DOWNLOAD_WEIGHT = 70.0 +EXTRACT_WEIGHT = 30.0 + + +class LayerPullStatus(Enum): + """Status values for pulling an image layer. + + These are a subset of the statuses in a docker pull image log. + The order field allows comparing which stage is further along. + """ + + PULLING_FS_LAYER = 1, "Pulling fs layer" + WAITING = 1, "Waiting" + RETRYING = 2, "Retrying" # Matches "Retrying in N seconds" + DOWNLOADING = 3, "Downloading" + VERIFYING_CHECKSUM = 4, "Verifying Checksum" + DOWNLOAD_COMPLETE = 5, "Download complete" + EXTRACTING = 6, "Extracting" + PULL_COMPLETE = 7, "Pull complete" + ALREADY_EXISTS = 7, "Already exists" + + def __init__(self, order: int, status: str) -> None: + """Set fields from values.""" + self.order = order + self.status = status + + def __eq__(self, value: object, /) -> bool: + """Check equality, allow string comparisons on status.""" + with suppress(AttributeError): + return self.status == cast(LayerPullStatus, value).status + return self.status == value + + def __hash__(self) -> int: + """Return hash based on status string.""" + return hash(self.status) + + def __lt__(self, other: object) -> bool: + """Order instances by stage progression.""" + with suppress(AttributeError): + return self.order < cast(LayerPullStatus, other).order + return False + + @classmethod + def from_status(cls, status: str) -> LayerPullStatus | None: + """Get enum from status string, or None if not recognized.""" + # Handle "Retrying in N seconds" pattern + if status.startswith("Retrying in "): + return cls.RETRYING + for member in cls: + if member.status == status: + return member + return None + + +@dataclass +class LayerProgress: + """Track progress of a single layer.""" + + layer_id: str + total_size: int = 0 # Size in bytes (from downloading, reused for extraction) + download_current: int = 0 + extract_current: int = 0 # Extraction progress in bytes (overlay2 only) + download_complete: bool = False + extract_complete: bool = False + already_exists: bool = False # Layer was already locally available + + def calculate_progress(self) -> float: + """Calculate layer progress 0-100. + + Progress is weighted: 70% download, 30% extraction. + For overlay2, we have byte-based extraction progress. + For containerd, extraction jumps from 70% to 100% on completion. + """ + if self.already_exists or self.extract_complete: + return 100.0 + + if self.download_complete: + # Check if we have extraction progress (overlay2) + if self.extract_current > 0 and self.total_size > 0: + extract_pct = min(1.0, self.extract_current / self.total_size) + return DOWNLOAD_WEIGHT + (extract_pct * EXTRACT_WEIGHT) + # No extraction progress yet - return 70% + return DOWNLOAD_WEIGHT + + if self.total_size > 0: + download_pct = min(1.0, self.download_current / self.total_size) + return download_pct * DOWNLOAD_WEIGHT + + return 0.0 + + +@dataclass +class ImagePullProgress: + """Track overall progress of pulling an image. + + Uses count-based progress where each layer contributes equally regardless of size. + This avoids progress regression when large layers are discovered late due to + Docker's rate-limiting of concurrent downloads. + + Progress is only reported after the first "Downloading" event, since Docker + sends "Already exists" and "Pulling fs layer" events before we know the full + layer count. + """ + + layers: dict[str, LayerProgress] = field(default_factory=dict) + _last_reported_progress: float = field(default=0.0, repr=False) + _seen_downloading: bool = field(default=False, repr=False) + + def get_or_create_layer(self, layer_id: str) -> LayerProgress: + """Get existing layer or create new one.""" + if layer_id not in self.layers: + self.layers[layer_id] = LayerProgress(layer_id=layer_id) + return self.layers[layer_id] + + def process_event(self, entry: PullLogEntry) -> None: + """Process a pull log event and update layer state.""" + # Skip events without layer ID or status + if not entry.id or not entry.status: + return + + # Skip metadata events that aren't layer-specific + # "Pulling from X" has id=tag but isn't a layer + if entry.status.startswith("Pulling from "): + return + + # Parse status to enum (returns None for unrecognized statuses) + status = LayerPullStatus.from_status(entry.status) + if status is None: + return + + layer = self.get_or_create_layer(entry.id) + + # Handle "Already exists" - layer is locally available + if status is LayerPullStatus.ALREADY_EXISTS: + layer.already_exists = True + layer.download_complete = True + layer.extract_complete = True + return + + # Handle "Pulling fs layer" / "Waiting" - layer is being tracked + if status in (LayerPullStatus.PULLING_FS_LAYER, LayerPullStatus.WAITING): + return + + # Handle "Downloading" - update download progress + if status is LayerPullStatus.DOWNLOADING: + # Mark that we've seen downloading - now we know layer count is complete + self._seen_downloading = True + if ( + entry.progress_detail + and entry.progress_detail.current is not None + and entry.progress_detail.total is not None + ): + layer.download_current = entry.progress_detail.current + # Only set total_size if not already set or if this is larger + # (handles case where total changes during download) + layer.total_size = max(layer.total_size, entry.progress_detail.total) + return + + # Handle "Verifying Checksum" - download is essentially complete + if status is LayerPullStatus.VERIFYING_CHECKSUM: + if layer.total_size > 0: + layer.download_current = layer.total_size + return + + # Handle "Download complete" - download phase done + if status is LayerPullStatus.DOWNLOAD_COMPLETE: + layer.download_complete = True + if layer.total_size > 0: + layer.download_current = layer.total_size + elif layer.total_size == 0: + # Small layer that skipped downloading phase + # Set minimal size so it doesn't distort weighted average + layer.total_size = 1 + layer.download_current = 1 + return + + # Handle "Extracting" - extraction in progress + if status is LayerPullStatus.EXTRACTING: + # For overlay2: progressDetail has {current, total} in bytes + # For containerd: progressDetail has {current, units: "s"} (time elapsed) + # We can only use byte-based progress (overlay2) + layer.download_complete = True + if layer.total_size > 0: + layer.download_current = layer.total_size + + # Check if this is byte-based extraction progress (overlay2) + # Overlay2 has {current, total} in bytes, no units field + # Containerd has {current, units: "s"} which is useless for progress + if ( + entry.progress_detail + and entry.progress_detail.current is not None + and entry.progress_detail.units is None + ): + # Use layer's total_size from downloading phase (doesn't change) + layer.extract_current = entry.progress_detail.current + _LOGGER.debug( + "Layer %s extracting: %d/%d (%.1f%%)", + layer.layer_id, + layer.extract_current, + layer.total_size, + (layer.extract_current / layer.total_size * 100) + if layer.total_size > 0 + else 0, + ) + return + + # Handle "Pull complete" - layer is fully done + if status is LayerPullStatus.PULL_COMPLETE: + layer.download_complete = True + layer.extract_complete = True + if layer.total_size > 0: + layer.download_current = layer.total_size + return + + # Handle "Retrying in N seconds" - reset download progress + if status is LayerPullStatus.RETRYING: + layer.download_current = 0 + layer.download_complete = False + return + + def calculate_progress(self) -> float: + """Calculate overall progress 0-100. + + Uses count-based progress where each layer contributes equally. + Each layer's individual progress (0-100) is weighted by 1/total_layers. + This ensures progress never goes backwards when large layers appear late. + + Returns 0 until we've seen the first "Downloading" event, since Docker + reports "Already exists" and "Pulling fs layer" events before we know + the complete layer count. + """ + # Don't report progress until we've seen downloading start + # This ensures we know the full layer count before calculating progress + if not self._seen_downloading or not self.layers: + return 0.0 + + # Each layer contributes equally: sum of layer progresses / total layers + total_progress = sum( + layer.calculate_progress() for layer in self.layers.values() + ) + return total_progress / len(self.layers) + + def get_stage(self) -> str | None: + """Get current stage based on layer states.""" + if not self.layers: + return None + + # Check if any layer is still downloading + for layer in self.layers.values(): + if layer.already_exists: + continue + if not layer.download_complete: + return "Downloading" + + # All downloads complete, check if extracting + for layer in self.layers.values(): + if layer.already_exists: + continue + if not layer.extract_complete: + return "Extracting" + + # All done + return "Pull complete" + + def should_update_job(self, threshold: float = 1.0) -> tuple[bool, float]: + """Check if job should be updated based on progress change. + + Returns (should_update, current_progress). + Updates are triggered when progress changes by at least threshold%. + Progress is guaranteed to only increase (monotonic). + """ + current_progress = self.calculate_progress() + + # Ensure monotonic progress - never report a decrease + # This can happen when new layers get size info and change the weighted average + if current_progress < self._last_reported_progress: + _LOGGER.debug( + "Progress decreased from %.1f%% to %.1f%%, keeping last reported", + self._last_reported_progress, + current_progress, + ) + return False, self._last_reported_progress + + if current_progress >= self._last_reported_progress + threshold: + _LOGGER.debug( + "Progress update: %.1f%% -> %.1f%% (delta: %.1f%%)", + self._last_reported_progress, + current_progress, + current_progress - self._last_reported_progress, + ) + self._last_reported_progress = current_progress + return True, current_progress + + return False, self._last_reported_progress diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 3232b2e25..4b8950d58 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -632,10 +632,6 @@ class DockerNotFound(DockerError): """Docker object don't Exists.""" -class DockerLogOutOfOrder(DockerError): - """Raise when log from docker action was out of order.""" - - class DockerNoSpaceOnDevice(DockerError): """Raise if a docker pull fails due to available space.""" diff --git a/tests/docker/test_interface.py b/tests/docker/test_interface.py index 7221cf9a7..9fec1084e 100644 --- a/tests/docker/test_interface.py +++ b/tests/docker/test_interface.py @@ -709,11 +709,18 @@ async def test_install_progress_handles_layers_skipping_download( await install_task await event.wait() - # First update from layer download should have rather low progress ((260937/25445459) / 2 ~ 0.5%) - assert install_job_snapshots[0]["progress"] < 1 + # With the new progress calculation approach: + # - Progress is weighted by layer size + # - Small layers that skip downloading get minimal size (1 byte) + # - Progress should increase monotonically + assert len(install_job_snapshots) > 0 - # Total 8 events should lead to a progress update on the install job - assert len(install_job_snapshots) == 8 + # Verify progress is monotonically increasing (or stable) + for i in range(1, len(install_job_snapshots)): + assert ( + install_job_snapshots[i]["progress"] + >= install_job_snapshots[i - 1]["progress"] + ) # Job should complete successfully assert job.done is True @@ -844,24 +851,24 @@ async def test_install_progress_containerd_snapshot( } assert [c.args[0] for c in ha_ws_client.async_send_command.call_args_list] == [ - # During downloading we get continuous progress updates from download status + # Count-based progress: 2 layers, each = 50%. Download = 0-35%, Extract = 35-50% job_event(0), + job_event(1.7), job_event(3.4), - job_event(8.5), + job_event(8.4), job_event(10.2), - job_event(15.3), - job_event(18.8), - job_event(29.0), - job_event(35.8), - job_event(42.6), - job_event(49.5), - job_event(56.0), - job_event(62.8), - # Downloading phase is considered 70% of total. After we only get one update - # per image downloaded when extraction is finished. It uses the total size - # received during downloading to determine percent complete then. + job_event(15.2), + job_event(18.7), + job_event(28.8), + job_event(35.7), + job_event(42.4), + job_event(49.3), + job_event(55.8), + job_event(62.7), + # Downloading phase is considered 70% of layer's progress. + # After download complete, extraction takes remaining 30% per layer. job_event(70.0), - job_event(84.8), + job_event(85.0), job_event(100), job_event(100, True), ] diff --git a/tests/docker/test_pull_progress.py b/tests/docker/test_pull_progress.py new file mode 100644 index 000000000..a4132b8c7 --- /dev/null +++ b/tests/docker/test_pull_progress.py @@ -0,0 +1,788 @@ +"""Tests for image pull progress tracking.""" + +import pytest + +from supervisor.docker.manager import PullLogEntry, PullProgressDetail +from supervisor.docker.pull_progress import ( + DOWNLOAD_WEIGHT, + EXTRACT_WEIGHT, + ImagePullProgress, + LayerProgress, +) + + +class TestLayerProgress: + """Tests for LayerProgress class.""" + + def test_already_exists_layer(self): + """Test that already existing layer returns 100%.""" + layer = LayerProgress(layer_id="abc123", already_exists=True) + assert layer.calculate_progress() == 100.0 + + def test_extract_complete_layer(self): + """Test that extracted layer returns 100%.""" + layer = LayerProgress( + layer_id="abc123", + total_size=1000, + download_current=1000, + download_complete=True, + extract_complete=True, + ) + assert layer.calculate_progress() == 100.0 + + def test_download_complete_not_extracted(self): + """Test layer that finished downloading but not extracting.""" + layer = LayerProgress( + layer_id="abc123", + total_size=1000, + download_current=1000, + download_complete=True, + extract_complete=False, + ) + assert layer.calculate_progress() == DOWNLOAD_WEIGHT # 70% + + def test_extraction_progress_overlay2(self): + """Test layer with byte-based extraction progress (overlay2).""" + layer = LayerProgress( + layer_id="abc123", + total_size=1000, + download_current=1000, + extract_current=500, # 50% extracted + download_complete=True, + extract_complete=False, + ) + # 70% + (50% of 30%) = 70% + 15% = 85% + assert layer.calculate_progress() == DOWNLOAD_WEIGHT + (0.5 * EXTRACT_WEIGHT) + + def test_downloading_progress(self): + """Test layer during download phase.""" + layer = LayerProgress( + layer_id="abc123", + total_size=1000, + download_current=500, # 50% downloaded + download_complete=False, + ) + # 50% of 70% = 35% + assert layer.calculate_progress() == 35.0 + + def test_no_size_info_yet(self): + """Test layer with no size information.""" + layer = LayerProgress(layer_id="abc123") + assert layer.calculate_progress() == 0.0 + + +class TestImagePullProgress: + """Tests for ImagePullProgress class.""" + + def test_empty_progress(self): + """Test progress with no layers.""" + progress = ImagePullProgress() + assert progress.calculate_progress() == 0.0 + + def test_all_layers_already_exist(self): + """Test when all layers already exist locally. + + When an image is fully cached, there are no "Downloading" events. + Progress stays at 0 until the job completes and sets 100%. + """ + progress = ImagePullProgress() + + # Simulate "Already exists" events + entry1 = PullLogEntry( + job_id="test", + id="layer1", + status="Already exists", + progress_detail=PullProgressDetail(), + ) + entry2 = PullLogEntry( + job_id="test", + id="layer2", + status="Already exists", + progress_detail=PullProgressDetail(), + ) + progress.process_event(entry1) + progress.process_event(entry2) + + # No downloading events = no progress reported (job completion sets 100%) + assert progress.calculate_progress() == 0.0 + + def test_single_layer_download(self): + """Test progress tracking for single layer download.""" + progress = ImagePullProgress() + + # Pull fs layer + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Start downloading + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + # 50% of download phase = 35% + assert progress.calculate_progress() == pytest.approx(35.0) + + # Download complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.calculate_progress() == 70.0 + + # Pull complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.calculate_progress() == 100.0 + + def test_multiple_layers_equal_weight_progress(self): + """Test count-based progress where each layer contributes equally.""" + progress = ImagePullProgress() + + # Two layers: sizes don't matter for weight, each layer = 50% + + # Pulling fs layer for both + progress.process_event( + PullLogEntry( + job_id="test", + id="large", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="small", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Large layer: 50% downloaded = 35% layer progress (50% of 70%) + progress.process_event( + PullLogEntry( + job_id="test", + id="large", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + + # Small layer: 100% downloaded, waiting for extraction = 70% layer progress + progress.process_event( + PullLogEntry( + job_id="test", + id="small", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="small", + status="Downloading", + progress_detail=PullProgressDetail(current=100, total=100), + ) + ) + + # Progress calculation (count-based, equal weight per layer): + # Large layer: 35% (50% of 70% download weight) + # Small layer: 70% (download complete) + # Each layer = 50% weight + # Total: (35 + 70) / 2 = 52.5% + assert progress.calculate_progress() == pytest.approx(52.5) + + def test_download_retry(self): + """Test that download retry resets progress.""" + progress = ImagePullProgress() + + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Download 50% + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + assert progress.calculate_progress() == pytest.approx(35.0) + + # Retry + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Retrying in 5 seconds", + ) + ) + assert progress.calculate_progress() == 0.0 + + def test_layer_skips_download(self): + """Test small layer that goes straight to Download complete.""" + progress = ImagePullProgress() + + progress.process_event( + PullLogEntry( + job_id="test", + id="small", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Goes directly to Download complete (skipping Downloading events) + progress.process_event( + PullLogEntry( + job_id="test", + id="small", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + + # Should still work - sets minimal size + layer = progress.layers["small"] + assert layer.total_size == 1 + assert layer.download_complete is True + + def test_containerd_extract_progress(self): + """Test extraction progress with containerd snapshotter (time-based).""" + progress = ImagePullProgress() + + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Download complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=1000, total=1000), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + + # Containerd extraction progress (time-based, not byte-based) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Extracting", + progress_detail=PullProgressDetail(current=5, units="s"), + ) + ) + + # Should be at 70% (download complete, time-based extraction not tracked) + assert progress.calculate_progress() == 70.0 + + # Pull complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.calculate_progress() == 100.0 + + def test_overlay2_extract_progress(self): + """Test extraction progress with overlay2 (byte-based).""" + progress = ImagePullProgress() + + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Download complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=1000, total=1000), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + + # At download complete, progress should be 70% + assert progress.calculate_progress() == 70.0 + + # Overlay2 extraction progress (byte-based, 50% extracted) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Extracting", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + + # Should be at 70% + (50% of 30%) = 85% + assert progress.calculate_progress() == pytest.approx(85.0) + + # Extraction continues to 80% + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Extracting", + progress_detail=PullProgressDetail(current=800, total=1000), + ) + ) + + # Should be at 70% + (80% of 30%) = 94% + assert progress.calculate_progress() == pytest.approx(94.0) + + # Pull complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.calculate_progress() == 100.0 + + def test_get_stage(self): + """Test stage detection.""" + progress = ImagePullProgress() + + assert progress.get_stage() is None + + # Add a layer that needs downloading + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + assert progress.get_stage() == "Downloading" + + # Download complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.get_stage() == "Extracting" + + # Pull complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + assert progress.get_stage() == "Pull complete" + + def test_should_update_job(self): + """Test update threshold logic.""" + progress = ImagePullProgress() + + # Initial state - no updates + should_update, _ = progress.should_update_job() + assert not should_update + + # Add a layer and start downloading + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Small progress - 1% + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=20, total=1000), + ) + ) + # 2% of download = 1.4% total + should_update, current = progress.should_update_job() + assert should_update + assert current == pytest.approx(1.4) + + # Tiny increment - shouldn't trigger update + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=25, total=1000), + ) + ) + should_update, _ = progress.should_update_job() + assert not should_update + + # Larger increment - should trigger + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=100, total=1000), + ) + ) + should_update, _ = progress.should_update_job() + assert should_update + + def test_verifying_checksum(self): + """Test that Verifying Checksum marks download as nearly complete.""" + progress = ImagePullProgress() + + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=800, total=1000), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Verifying Checksum", + progress_detail=PullProgressDetail(), + ) + ) + + layer = progress.layers["layer1"] + assert layer.download_current == 1000 # Should be set to total + + def test_events_without_status_ignored(self): + """Test that events without status are ignored.""" + progress = ImagePullProgress() + + # Event without status (just id field) + progress.process_event( + PullLogEntry( + job_id="test", + id="abc123", + ) + ) + + # Event without id + progress.process_event( + PullLogEntry( + job_id="test", + status="Digest: sha256:abc123", + ) + ) + + # They shouldn't create layers or cause errors + assert len(progress.layers) == 0 + + def test_mixed_already_exists_and_pull(self): + """Test combination of cached and pulled layers.""" + progress = ImagePullProgress() + + # Layer 1 already exists + progress.process_event( + PullLogEntry( + job_id="test", + id="cached", + status="Already exists", + progress_detail=PullProgressDetail(), + ) + ) + + # Layer 2 needs to be pulled + progress.process_event( + PullLogEntry( + job_id="test", + id="pulled", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="pulled", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + + # Count-based: 2 layers total, each = 50% + # cached: 100% (already exists) + # pulled: 35% (50% of 70% download weight) + # Total: (100 + 35) / 2 = 67.5% + assert progress.calculate_progress() == pytest.approx(67.5) + + # Complete the pulled layer + progress.process_event( + PullLogEntry( + job_id="test", + id="pulled", + status="Download complete", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="pulled", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + + assert progress.calculate_progress() == 100.0 + + def test_pending_layers_prevent_premature_100(self): + """Test that layers without size info scale down progress.""" + progress = ImagePullProgress() + + # First batch of layers - they complete + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer2", + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Layer1 downloads and completes + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Downloading", + progress_detail=PullProgressDetail(current=1000, total=1000), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id="layer1", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + + # Layer2 is still pending (no size info yet) - simulating Docker rate limiting + # Progress should NOT be 100% because layer2 hasn't started + + # Layer1 is 100% complete, layer2 is 0% + # With scaling: 1 known layer at 100%, 1 pending layer + # Scale factor = 1/(1+1) = 0.5, so progress = 100 * 0.5 = 50% + assert progress.calculate_progress() == pytest.approx(50.0) + + # Now layer2 starts downloading + progress.process_event( + PullLogEntry( + job_id="test", + id="layer2", + status="Downloading", + progress_detail=PullProgressDetail(current=500, total=1000), + ) + ) + + # Now both layers have size info, no scaling needed + # Layer1: 100%, Layer2: 35% (50% of 70%) + # Weighted by equal size: (100 + 35) / 2 = 67.5% + assert progress.calculate_progress() == pytest.approx(67.5) + + # Complete layer2 + progress.process_event( + PullLogEntry( + job_id="test", + id="layer2", + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + + assert progress.calculate_progress() == 100.0 + + def test_large_layers_appearing_late_dont_cause_regression(self): + """Test that large layers discovered late don't cause progress to drop. + + This simulates Docker's rate-limiting behavior where small layers complete + first, then large layers start downloading later. + """ + progress = ImagePullProgress() + + # All layers announced upfront (Docker does this) + for layer_id in ["small1", "small2", "big1", "big2"]: + progress.process_event( + PullLogEntry( + job_id="test", + id=layer_id, + status="Pulling fs layer", + progress_detail=PullProgressDetail(), + ) + ) + + # Big layers are "Waiting" (rate limited) + for layer_id in ["big1", "big2"]: + progress.process_event( + PullLogEntry( + job_id="test", + id=layer_id, + status="Waiting", + progress_detail=PullProgressDetail(), + ) + ) + + # Small layers download quickly (1KB each) + for layer_id in ["small1", "small2"]: + progress.process_event( + PullLogEntry( + job_id="test", + id=layer_id, + status="Downloading", + progress_detail=PullProgressDetail(current=1000, total=1000), + ) + ) + progress.process_event( + PullLogEntry( + job_id="test", + id=layer_id, + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + + # At this point, 2 small layers are complete, 2 big layers are unknown size + progress_before_big = progress.calculate_progress() + + # Now big layers start downloading - they're 100MB each! + progress.process_event( + PullLogEntry( + job_id="test", + id="big1", + status="Downloading", + progress_detail=PullProgressDetail(current=1000000, total=100000000), + ) + ) + + progress_after_big1 = progress.calculate_progress() + + # Progress should NOT drop significantly when big layer appears + # The monotonic tracking in should_update_job will help, but the + # raw calculation should also not regress too badly + assert progress_after_big1 >= progress_before_big * 0.5, ( + f"Progress dropped too much: {progress_before_big} -> {progress_after_big1}" + ) + + # Second big layer appears + progress.process_event( + PullLogEntry( + job_id="test", + id="big2", + status="Downloading", + progress_detail=PullProgressDetail(current=1000000, total=100000000), + ) + ) + + # Should still make forward progress overall + # Complete all layers + for layer_id in ["big1", "big2"]: + progress.process_event( + PullLogEntry( + job_id="test", + id=layer_id, + status="Pull complete", + progress_detail=PullProgressDetail(), + ) + ) + + assert progress.calculate_progress() == 100.0