mirror of
https://github.com/home-assistant/core.git
synced 2026-07-04 21:25:26 +01:00
190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
"""Tests for the syncthing sensor platform."""
|
|
|
|
from datetime import timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from aiosyncthing.exceptions import SyncthingError
|
|
import pytest
|
|
from syrupy.assertion import SnapshotAssertion
|
|
|
|
from homeassistant.components.syncthing.const import (
|
|
DOMAIN,
|
|
EVENTS,
|
|
SCAN_INTERVAL,
|
|
SERVER_AVAILABLE,
|
|
SERVER_UNAVAILABLE,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntryState
|
|
from homeassistant.const import STATE_UNAVAILABLE
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import dispatcher, entity_registry as er
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from . import FOLDER_ID, SERVER_ID, SERVER_ID_SHORT_HA
|
|
|
|
from tests.common import (
|
|
MockConfigEntry,
|
|
async_fire_time_changed,
|
|
load_json_object_fixture,
|
|
snapshot_platform,
|
|
)
|
|
|
|
|
|
async def test_sensor_platform_setup(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
entity_registry: er.EntityRegistry,
|
|
snapshot: SnapshotAssertion,
|
|
mock_syncthing: MagicMock,
|
|
) -> None:
|
|
"""Test sensor platform sets up folder sensors."""
|
|
await snapshot_platform(hass, entity_registry, snapshot, entry.entry_id)
|
|
|
|
|
|
async def test_sensor_platform_no_sensors_on_config_error(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
entity_registry: er.EntityRegistry,
|
|
mock_syncthing_client: MagicMock,
|
|
) -> None:
|
|
"""Test sensor platform does not create folder sensors when config fetch fails."""
|
|
mock_syncthing_client.system.config.side_effect = SyncthingError("Connection error")
|
|
|
|
with patch(
|
|
"homeassistant.components.syncthing.aiosyncthing.Syncthing",
|
|
autospec=True,
|
|
) as mock_class:
|
|
mock_class.return_value = mock_syncthing_client
|
|
await hass.config_entries.async_setup(entry.entry_id)
|
|
|
|
assert entry.state is ConfigEntryState.LOADED
|
|
entity_id = entity_registry.async_get_entity_id(
|
|
"sensor", DOMAIN, f"{SERVER_ID_SHORT_HA}-{FOLDER_ID}"
|
|
)
|
|
assert entity_id is None
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"event_fixture",
|
|
[
|
|
"folder_summary_event.json",
|
|
"state_changed_event.json",
|
|
"folder_paused_event.json",
|
|
],
|
|
)
|
|
async def test_folder_sensor_updates_on_event(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
mock_syncthing: MagicMock,
|
|
entity_registry: er.EntityRegistry,
|
|
snapshot: SnapshotAssertion,
|
|
event_fixture: str,
|
|
) -> None:
|
|
"""Test folder sensor updates when receiving different events."""
|
|
event = await hass.async_add_executor_job(
|
|
load_json_object_fixture, event_fixture, DOMAIN
|
|
)
|
|
|
|
folder = event["data"].get("folder") or event["data"]["id"]
|
|
signal = f"{EVENTS[event['type']]}-{SERVER_ID}-{folder}"
|
|
|
|
dispatcher.async_dispatcher_send(hass, signal, event)
|
|
await hass.async_block_till_done()
|
|
await snapshot_platform(hass, entity_registry, snapshot, entry.entry_id)
|
|
|
|
|
|
async def test_folder_sensor_unavailable_on_server_unavailable(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
mock_syncthing: MagicMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test folder sensor becomes unavailable when server is unavailable."""
|
|
entity_id = entity_registry.async_get_entity_id(
|
|
"sensor", DOMAIN, f"{SERVER_ID_SHORT_HA}-{FOLDER_ID}"
|
|
)
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == "idle"
|
|
|
|
dispatcher.async_dispatcher_send(
|
|
hass,
|
|
f"{SERVER_UNAVAILABLE}-{SERVER_ID}",
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == STATE_UNAVAILABLE
|
|
|
|
|
|
async def test_folder_sensor_available_on_server_available(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
mock_syncthing: MagicMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test folder sensor becomes available when server comes back online."""
|
|
dispatcher.async_dispatcher_send(
|
|
hass,
|
|
f"{SERVER_UNAVAILABLE}-{SERVER_ID}",
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
entity_id = entity_registry.async_get_entity_id(
|
|
"sensor", DOMAIN, f"{SERVER_ID_SHORT_HA}-{FOLDER_ID}"
|
|
)
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == STATE_UNAVAILABLE
|
|
|
|
dispatcher.async_dispatcher_send(
|
|
hass,
|
|
f"{SERVER_AVAILABLE}-{SERVER_ID}",
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == "idle"
|
|
|
|
|
|
async def test_folder_sensor_polls_status(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
mock_syncthing: MagicMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test folder sensor polls for status updates."""
|
|
syncing_status = await hass.async_add_executor_job(
|
|
load_json_object_fixture, "folder_status.json", DOMAIN
|
|
)
|
|
syncing_status["state"] = "syncing"
|
|
mock_syncthing.database.status = AsyncMock(return_value=syncing_status)
|
|
|
|
future = dt_util.utcnow() + SCAN_INTERVAL + timedelta(seconds=1)
|
|
async_fire_time_changed(hass, future)
|
|
await hass.async_block_till_done()
|
|
|
|
entity_id = entity_registry.async_get_entity_id(
|
|
"sensor", DOMAIN, f"{SERVER_ID_SHORT_HA}-{FOLDER_ID}"
|
|
)
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == "syncing"
|
|
|
|
|
|
async def test_folder_sensor_error_makes_unavailable(
|
|
hass: HomeAssistant,
|
|
entry: MockConfigEntry,
|
|
mock_syncthing: MagicMock,
|
|
entity_registry: er.EntityRegistry,
|
|
) -> None:
|
|
"""Test folder sensor becomes unavailable on status error."""
|
|
mock_syncthing.database.status = AsyncMock(side_effect=SyncthingError("Error"))
|
|
|
|
future = dt_util.utcnow() + SCAN_INTERVAL + timedelta(seconds=1)
|
|
async_fire_time_changed(hass, future)
|
|
await hass.async_block_till_done()
|
|
|
|
entity_id = entity_registry.async_get_entity_id(
|
|
"sensor", DOMAIN, f"{SERVER_ID_SHORT_HA}-{FOLDER_ID}"
|
|
)
|
|
state = hass.states.get(entity_id) if entity_id else None
|
|
assert state is not None and state.state == STATE_UNAVAILABLE
|