mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-12-24 20:35:55 +00:00
Migrate images from dockerpy to aiodocker (#6252)
* Migrate images from dockerpy to aiodocker * Add missing coverage and fix bug in repair * Bind libraries to different files and refactor images.pull * Use the same socket again Try using the same socket again. * Fix pytest --------- Co-authored-by: Stefan Agner <stefan@agner.ch>
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
"""Common test functions."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from importlib import import_module
|
||||
from inspect import getclosurevars
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Self
|
||||
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
|
||||
@@ -145,3 +146,22 @@ class MockResponse:
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
"""Exit the context manager."""
|
||||
|
||||
|
||||
class AsyncIterator:
|
||||
"""Make list/fixture into async iterator for test mocks."""
|
||||
|
||||
def __init__(self, seq: Sequence[Any]) -> None:
|
||||
"""Initialize with sequence."""
|
||||
self.iter = iter(seq)
|
||||
|
||||
def __aiter__(self) -> Self:
|
||||
"""Implement aiter."""
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> Any:
|
||||
"""Return next in sequence."""
|
||||
try:
|
||||
return next(self.iter)
|
||||
except StopIteration:
|
||||
raise StopAsyncIteration() from None
|
||||
|
||||
Reference in New Issue
Block a user