mirror of
https://github.com/home-assistant/core.git
synced 2026-04-02 08:26:41 +01:00
191 lines
6.0 KiB
Python
191 lines
6.0 KiB
Python
"""Test Teltonika sensor platform."""
|
|
|
|
from datetime import timedelta
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from aiohttp import ClientResponseError
|
|
from freezegun.api import FrozenDateTimeFactory
|
|
import pytest
|
|
from syrupy.assertion import SnapshotAssertion
|
|
from teltasync import TeltonikaAuthenticationError, TeltonikaConnectionError
|
|
from teltasync.error_codes import TeltonikaErrorCode
|
|
|
|
from homeassistant.components.teltonika.const import DOMAIN
|
|
from homeassistant.config_entries import SOURCE_REAUTH
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import entity_registry as er
|
|
|
|
from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform
|
|
|
|
|
|
async def test_sensors(
|
|
hass: HomeAssistant,
|
|
snapshot: SnapshotAssertion,
|
|
entity_registry: er.EntityRegistry,
|
|
init_integration: MockConfigEntry,
|
|
) -> None:
|
|
"""Test sensor entities match snapshot."""
|
|
await snapshot_platform(hass, entity_registry, snapshot, init_integration.entry_id)
|
|
|
|
|
|
async def test_sensor_modem_removed(
|
|
hass: HomeAssistant,
|
|
entity_registry: er.EntityRegistry,
|
|
init_integration: MockConfigEntry,
|
|
mock_modems: MagicMock,
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Test sensor becomes unavailable when modem is removed."""
|
|
|
|
# Get initial sensor state
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
|
|
# Update coordinator with empty modem data
|
|
mock_response = MagicMock()
|
|
mock_response.data = [] # No modems
|
|
mock_modems.get_status.return_value = mock_response
|
|
|
|
freezer.tick(timedelta(seconds=31))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
# Check that entity is marked as unavailable
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
|
|
# When modem is removed, entity should be marked as unavailable
|
|
# Verify through entity registry that entity exists but is unavailable
|
|
entity_entry = entity_registry.async_get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert entity_entry is not None
|
|
# State should show unavailable when modem is removed
|
|
assert state.state == "unavailable"
|
|
|
|
|
|
async def test_sensor_update_failure_and_recovery(
|
|
hass: HomeAssistant,
|
|
mock_modems: AsyncMock,
|
|
init_integration: MockConfigEntry,
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Test sensor becomes unavailable on update failure and recovers."""
|
|
|
|
# Get initial sensor state, here it should be available
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
assert state.state == "-63"
|
|
|
|
mock_modems.get_status.side_effect = TeltonikaConnectionError("Connection lost")
|
|
|
|
freezer.tick(timedelta(seconds=30))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
# Sensor should now be unavailable
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
assert state.state == "unavailable"
|
|
# Simulate recovery
|
|
mock_modems.get_status.side_effect = None
|
|
|
|
freezer.tick(timedelta(seconds=30))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
# Sensor should be available again with correct data
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
assert state.state == "-63"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("side_effect", "expect_reauth"),
|
|
[
|
|
(TeltonikaAuthenticationError("Invalid credentials"), True),
|
|
(
|
|
ClientResponseError(
|
|
request_info=MagicMock(),
|
|
history=(),
|
|
status=401,
|
|
message="Unauthorized",
|
|
headers={},
|
|
),
|
|
True,
|
|
),
|
|
(
|
|
ClientResponseError(
|
|
request_info=MagicMock(),
|
|
history=(),
|
|
status=500,
|
|
message="Server error",
|
|
headers={},
|
|
),
|
|
False,
|
|
),
|
|
],
|
|
ids=["auth_exception", "http_auth_error", "http_non_auth_error"],
|
|
)
|
|
async def test_sensor_update_exception_paths(
|
|
hass: HomeAssistant,
|
|
mock_modems: AsyncMock,
|
|
init_integration: MockConfigEntry,
|
|
freezer: FrozenDateTimeFactory,
|
|
side_effect: Exception,
|
|
expect_reauth: bool,
|
|
) -> None:
|
|
"""Test auth and non-auth exceptions during updates."""
|
|
mock_modems.get_status.side_effect = side_effect
|
|
|
|
freezer.tick(timedelta(seconds=31))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
assert state.state == "unavailable"
|
|
|
|
has_reauth = any(
|
|
flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH
|
|
for flow in hass.config_entries.flow.async_progress()
|
|
)
|
|
assert has_reauth is expect_reauth
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("error_code", "expect_reauth"),
|
|
[
|
|
(TeltonikaErrorCode.UNAUTHORIZED_ACCESS, True),
|
|
(999, False),
|
|
],
|
|
ids=["api_auth_error", "api_non_auth_error"],
|
|
)
|
|
async def test_sensor_update_unsuccessful_response_paths(
|
|
hass: HomeAssistant,
|
|
mock_modems: AsyncMock,
|
|
init_integration: MockConfigEntry,
|
|
freezer: FrozenDateTimeFactory,
|
|
error_code: int,
|
|
expect_reauth: bool,
|
|
) -> None:
|
|
"""Test unsuccessful API response handling."""
|
|
mock_modems.get_status.side_effect = None
|
|
mock_modems.get_status.return_value = MagicMock(
|
|
success=False,
|
|
data=None,
|
|
errors=[MagicMock(code=error_code, error="API error")],
|
|
)
|
|
|
|
freezer.tick(timedelta(seconds=31))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get("sensor.rutx50_test_internal_modem_rssi")
|
|
assert state is not None
|
|
assert state.state == "unavailable"
|
|
|
|
has_reauth = any(
|
|
flow["handler"] == DOMAIN and flow["context"]["source"] == SOURCE_REAUTH
|
|
for flow in hass.config_entries.flow.async_progress()
|
|
)
|
|
assert has_reauth is expect_reauth
|