mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-12-24 12:29:08 +00:00
Format data disk in Supervisor instead of OS Agent (#4212)
* Supervisor formats data disk instead of os agent * Fix issues occurring during tests * Can't migrate if target is too small
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
from dbus_fast.service import method
|
||||
from dbus_fast.service import method, signal
|
||||
import pytest
|
||||
|
||||
from supervisor.dbus.const import DBUS_OBJECT_BASE
|
||||
@@ -24,6 +24,10 @@ class TestInterface(DBusServiceMock):
|
||||
def test(self, _: "b") -> None: # noqa: F821
|
||||
"""Do Test method."""
|
||||
|
||||
@signal(name="Test")
|
||||
def signal_test(self) -> None:
|
||||
"""Signal Test."""
|
||||
|
||||
|
||||
@pytest.fixture(name="test_service")
|
||||
async def fixture_test_service(dbus_session_bus: MessageBus) -> TestInterface:
|
||||
@@ -74,3 +78,96 @@ async def test_internal_dbus_errors(
|
||||
await test_obj.call_test(True)
|
||||
|
||||
capture_exception.assert_called_once_with(err)
|
||||
|
||||
|
||||
async def test_introspect(test_service: TestInterface, dbus_session_bus: MessageBus):
|
||||
"""Test introspect of dbus object."""
|
||||
test_obj = DBus(dbus_session_bus, "service.test.TestInterface", DBUS_OBJECT_BASE)
|
||||
|
||||
introspection = await test_obj.introspect()
|
||||
|
||||
assert {"service.test.TestInterface", "org.freedesktop.DBus.Properties"} <= {
|
||||
interface.name for interface in introspection.interfaces
|
||||
}
|
||||
test_interface = next(
|
||||
interface
|
||||
for interface in introspection.interfaces
|
||||
if interface.name == "service.test.TestInterface"
|
||||
)
|
||||
assert "Test" in {method_.name for method_ in test_interface.methods}
|
||||
|
||||
|
||||
async def test_init_proxy(test_service: TestInterface, dbus_session_bus: MessageBus):
|
||||
"""Test init proxy on already connected object to update interfaces."""
|
||||
test_obj = await DBus.connect(
|
||||
dbus_session_bus, "service.test.TestInterface", DBUS_OBJECT_BASE
|
||||
)
|
||||
orig_introspection = await test_obj.introspect()
|
||||
callback_count = 0
|
||||
|
||||
def test_callback():
|
||||
nonlocal callback_count
|
||||
callback_count += 1
|
||||
|
||||
class TestInterface2(TestInterface):
|
||||
"""Test interface 2."""
|
||||
|
||||
interface = "service.test.TestInterface.Test2"
|
||||
object_path = DBUS_OBJECT_BASE
|
||||
|
||||
# Test interfaces and methods match expected
|
||||
assert "service.test.TestInterface" in test_obj.proxies
|
||||
assert await test_obj.call_test(True) is None
|
||||
assert "service.test.TestInterface.Test2" not in test_obj.proxies
|
||||
|
||||
# Test basic signal listening works
|
||||
test_obj.on_test(test_callback)
|
||||
test_service.signal_test()
|
||||
await test_service.ping()
|
||||
assert callback_count == 1
|
||||
callback_count = 0
|
||||
|
||||
# Export the second interface and re-create proxy
|
||||
test_service_2 = TestInterface2()
|
||||
test_service_2.export(dbus_session_bus)
|
||||
|
||||
await test_obj.init_proxy()
|
||||
|
||||
# Test interfaces and methods match expected
|
||||
assert "service.test.TestInterface" in test_obj.proxies
|
||||
assert await test_obj.call_test(True) is None
|
||||
assert "service.test.TestInterface.Test2" in test_obj.proxies
|
||||
assert await test_obj.Test2.call_test(True) is None
|
||||
|
||||
# Test signal listening. First listener should still be attached
|
||||
test_obj.Test2.on_test(test_callback)
|
||||
test_service_2.signal_test()
|
||||
await test_service_2.ping()
|
||||
assert callback_count == 1
|
||||
|
||||
test_service.signal_test()
|
||||
await test_service.ping()
|
||||
assert callback_count == 2
|
||||
callback_count = 0
|
||||
|
||||
# Return to original introspection and test interfaces have reset
|
||||
await test_obj.init_proxy(introspection=orig_introspection)
|
||||
|
||||
assert "service.test.TestInterface" in test_obj.proxies
|
||||
assert "service.test.TestInterface.Test2" not in test_obj.proxies
|
||||
|
||||
# Signal listener for second interface should disconnect, first remains
|
||||
test_service_2.signal_test()
|
||||
await test_service_2.ping()
|
||||
assert callback_count == 0
|
||||
|
||||
test_service.signal_test()
|
||||
await test_service.ping()
|
||||
assert callback_count == 1
|
||||
callback_count = 0
|
||||
|
||||
# Should be able to disconnect first signal listener on new proxy obj
|
||||
test_obj.off_test(test_callback)
|
||||
test_service.signal_test()
|
||||
await test_service.ping()
|
||||
assert callback_count == 0
|
||||
|
||||
Reference in New Issue
Block a user