1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2025-12-24 12:29:08 +00:00
Files
supervisor/tests/dbus/udisks2/test_filesystem.py
Raman Gupta 4c2d729646 Add udisks2 dbus support (#3848)
* Add udisks2 dbus support

* assert mountpoints

* Comment

* Add reference links

* docstring

* fix type

* fix type

* add typing extensions as import

* isort

* additional changes

* Simplify classes and conversions, fix bugs

* More simplification

* Fix imports

* fix pip

* Add additional properties and fix requirements

* fix tests maybe

* Handle optionality of certain configuration details

* black

* connect to devices before returning them

* Refactor for latest dbus work

* Not .items

* fix mountpoints logic

* use variants

* Use variants for options too

* isort

* Switch to dbus fast

* Move import to parent

* Add some fixture data

* Add another fixture and reduce the block devices list

* Implement changes discussed with mike

* Add property fixtures

* update object path

* Fix get_block_devices call

* Tests and refactor to minimize dbus reconnects

* Call super init in DBusInterfaceProxy

* Fix permissions on introspection files

---------

Co-authored-by: Mike Degatano <michael.degatano@gmail.com>
2023-02-15 08:17:29 +01:00

85 lines
2.7 KiB
Python

"""Test UDisks2 Filesystem."""
import asyncio
from pathlib import Path
from dbus_fast.aio.message_bus import MessageBus
import pytest
from supervisor.dbus.udisks2.data import MountOptions, UnmountOptions
from supervisor.dbus.udisks2.filesystem import UDisks2Filesystem
from tests.common import fire_property_change_signal
@pytest.fixture(name="sda1")
async def fixture_sda1(dbus: list[str], dbus_bus: MessageBus) -> UDisks2Filesystem:
"""Return connected sda1 filesystem object."""
filesystem = UDisks2Filesystem("/org/freedesktop/UDisks2/block_devices/sda1")
await filesystem.connect(dbus_bus)
return filesystem
async def test_filesystem_info(dbus: list[str], dbus_bus: MessageBus):
"""Test filesystem info."""
sda1 = UDisks2Filesystem("/org/freedesktop/UDisks2/block_devices/sda1")
sdb1 = UDisks2Filesystem(
"/org/freedesktop/UDisks2/block_devices/sdb1", sync_properties=False
)
assert sda1.size is None
assert sda1.mount_points is None
assert sdb1.size is None
assert sdb1.mount_points is None
await sda1.connect(dbus_bus)
await sdb1.connect(dbus_bus)
assert sda1.size == 250058113024
assert sda1.mount_points == []
assert sdb1.size == 67108864
assert sdb1.mount_points == [Path("/mnt/data/supervisor/media/ext")]
fire_property_change_signal(
sda1, {"MountPoints": [bytearray("/mnt/media", encoding="utf-8")]}
)
await asyncio.sleep(0)
assert sda1.mount_points == [Path("/mnt/media")]
with pytest.raises(AssertionError):
fire_property_change_signal(
sdb1, {"MountPoints": [bytearray("/mnt/media", encoding="utf-8")]}
)
async def test_mount(dbus: list[str], sda1: UDisks2Filesystem):
"""Test mount."""
assert await sda1.mount(MountOptions(fstype="gpt")) == "/run/media/dev/hassos_data"
assert dbus == [
"/org/freedesktop/UDisks2/block_devices/sda1-org.freedesktop.UDisks2.Filesystem.Mount"
]
async def test_unmount(dbus: list[str], sda1: UDisks2Filesystem):
"""Test unmount."""
await sda1.unmount(UnmountOptions(force=True))
assert dbus == [
"/org/freedesktop/UDisks2/block_devices/sda1-org.freedesktop.UDisks2.Filesystem.Unmount"
]
async def test_check(dbus: list[str], sda1: UDisks2Filesystem):
"""Test check."""
assert await sda1.check() is True
assert dbus == [
"/org/freedesktop/UDisks2/block_devices/sda1-org.freedesktop.UDisks2.Filesystem.Check"
]
async def test_repair(dbus: list[str], sda1: UDisks2Filesystem):
"""Test repair."""
assert await sda1.repair() is True
assert dbus == [
"/org/freedesktop/UDisks2/block_devices/sda1-org.freedesktop.UDisks2.Filesystem.Repair"
]