mirror of
https://github.com/home-assistant/core.git
synced 2026-02-15 07:36:16 +00:00
Shelly cover position update when moving (#139008)
Co-authored-by: Shay Levy <levyshay1@gmail.com>
This commit is contained in:
@@ -44,6 +44,9 @@ BLOCK_MAX_TRANSITION_TIME_MS: Final = 5000
|
||||
# min RPC light transition time in seconds (max=10800, limited by light entity to 6553)
|
||||
RPC_MIN_TRANSITION_TIME_SEC = 0.5
|
||||
|
||||
# time in seconds between two cover state updates when moving
|
||||
RPC_COVER_UPDATE_TIME_SEC = 1.0
|
||||
|
||||
RGBW_MODELS: Final = (
|
||||
MODEL_BULB,
|
||||
MODEL_RGBW2,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any, cast
|
||||
|
||||
from aioshelly.block_device import Block
|
||||
@@ -17,6 +18,7 @@ from homeassistant.components.cover import (
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from .const import RPC_COVER_UPDATE_TIME_SEC
|
||||
from .coordinator import ShellyBlockCoordinator, ShellyConfigEntry, ShellyRpcCoordinator
|
||||
from .entity import ShellyBlockEntity, ShellyRpcEntity
|
||||
from .utils import get_device_entry_gen, get_rpc_key_ids
|
||||
@@ -158,6 +160,7 @@ class RpcShellyCover(ShellyRpcEntity, CoverEntity):
|
||||
"""Initialize rpc cover."""
|
||||
super().__init__(coordinator, f"cover:{id_}")
|
||||
self._id = id_
|
||||
self._update_task: asyncio.Task | None = None
|
||||
if self.status["pos_control"]:
|
||||
self._attr_supported_features |= CoverEntityFeature.SET_POSITION
|
||||
if coordinator.device.config[f"cover:{id_}"].get("slat", {}).get("enable"):
|
||||
@@ -199,6 +202,33 @@ class RpcShellyCover(ShellyRpcEntity, CoverEntity):
|
||||
"""Return if the cover is opening."""
|
||||
return cast(bool, self.status["state"] == "opening")
|
||||
|
||||
def launch_update_task(self) -> None:
|
||||
"""Launch the update position task if needed."""
|
||||
if not self._update_task or self._update_task.done():
|
||||
self._update_task = (
|
||||
self.coordinator.config_entry.async_create_background_task(
|
||||
self.hass,
|
||||
self.update_position(),
|
||||
f"Shelly cover update [{self._id} - {self.name}]",
|
||||
)
|
||||
)
|
||||
|
||||
async def update_position(self) -> None:
|
||||
"""Update the cover position every second."""
|
||||
try:
|
||||
while self.is_closing or self.is_opening:
|
||||
await self.coordinator.device.update_status()
|
||||
self.async_write_ha_state()
|
||||
await asyncio.sleep(RPC_COVER_UPDATE_TIME_SEC)
|
||||
finally:
|
||||
self._update_task = None
|
||||
|
||||
def _update_callback(self) -> None:
|
||||
"""Handle device update. Use a task when opening/closing is in progress."""
|
||||
super()._update_callback()
|
||||
if self.is_closing or self.is_opening:
|
||||
self.launch_update_task()
|
||||
|
||||
async def async_close_cover(self, **kwargs: Any) -> None:
|
||||
"""Close cover."""
|
||||
await self.call_rpc("Cover.Close", {"id": self._id})
|
||||
|
||||
@@ -100,10 +100,12 @@ async def mock_rest_update(
|
||||
|
||||
|
||||
async def mock_polling_rpc_update(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
seconds: float = RPC_SENSORS_POLLING_INTERVAL,
|
||||
) -> None:
|
||||
"""Move time to create polling RPC sensors update event."""
|
||||
freezer.tick(timedelta(seconds=RPC_SENSORS_POLLING_INTERVAL))
|
||||
freezer.tick(timedelta(seconds=seconds))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from copy import deepcopy
|
||||
from unittest.mock import Mock
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.cover import (
|
||||
@@ -21,11 +22,12 @@ from homeassistant.components.cover import (
|
||||
SERVICE_STOP_COVER_TILT,
|
||||
CoverState,
|
||||
)
|
||||
from homeassistant.components.shelly.const import RPC_COVER_UPDATE_TIME_SEC
|
||||
from homeassistant.const import ATTR_ENTITY_ID
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_registry import EntityRegistry
|
||||
|
||||
from . import init_integration, mutate_rpc_device_status
|
||||
from . import init_integration, mock_polling_rpc_update, mutate_rpc_device_status
|
||||
|
||||
ROLLER_BLOCK_ID = 1
|
||||
|
||||
@@ -280,3 +282,138 @@ async def test_rpc_cover_tilt(
|
||||
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 10
|
||||
|
||||
|
||||
async def test_update_position_closing(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
mock_rpc_device: Mock,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Test update_position while the cover is closing."""
|
||||
entity_id = "cover.test_name_test_cover_0"
|
||||
await init_integration(hass, 2)
|
||||
|
||||
# Set initial state to closing
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "state", "closing"
|
||||
)
|
||||
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "current_pos", 40)
|
||||
mock_rpc_device.mock_update()
|
||||
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.CLOSING
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 40
|
||||
|
||||
# Simulate position decrement
|
||||
async def simulated_update(*args, **kwargs):
|
||||
pos = mock_rpc_device.status["cover:0"]["current_pos"]
|
||||
if pos > 0:
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "current_pos", pos - 10
|
||||
)
|
||||
else:
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 0
|
||||
)
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "state", "closed"
|
||||
)
|
||||
|
||||
# Patching the mock update_status method
|
||||
monkeypatch.setattr(mock_rpc_device, "update_status", simulated_update)
|
||||
|
||||
# Simulate position updates during closing
|
||||
for position in range(40, -1, -10):
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == position
|
||||
assert state.state == CoverState.CLOSING
|
||||
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
|
||||
|
||||
# Final state should be closed
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.CLOSED
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 0
|
||||
|
||||
|
||||
async def test_update_position_opening(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
mock_rpc_device: Mock,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
"""Test update_position while the cover is opening."""
|
||||
entity_id = "cover.test_name_test_cover_0"
|
||||
await init_integration(hass, 2)
|
||||
|
||||
# Set initial state to opening at 60
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "state", "opening"
|
||||
)
|
||||
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "current_pos", 60)
|
||||
mock_rpc_device.mock_update()
|
||||
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.OPENING
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 60
|
||||
|
||||
# Simulate position increment
|
||||
async def simulated_update(*args, **kwargs):
|
||||
pos = mock_rpc_device.status["cover:0"]["current_pos"]
|
||||
if pos < 100:
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "current_pos", pos + 10
|
||||
)
|
||||
else:
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 100
|
||||
)
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "state", "open"
|
||||
)
|
||||
|
||||
# Patching the mock update_status method
|
||||
monkeypatch.setattr(mock_rpc_device, "update_status", simulated_update)
|
||||
|
||||
# Check position updates during opening
|
||||
for position in range(60, 101, 10):
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == position
|
||||
assert state.state == CoverState.OPENING
|
||||
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
|
||||
|
||||
# Final state should be open
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.OPEN
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 100
|
||||
|
||||
|
||||
async def test_update_position_no_movement(
|
||||
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Test update_position when the cover is not moving."""
|
||||
entity_id = "cover.test_name_test_cover_0"
|
||||
await init_integration(hass, 2)
|
||||
|
||||
# Set initial state to open
|
||||
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "open")
|
||||
mutate_rpc_device_status(
|
||||
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 100
|
||||
)
|
||||
mock_rpc_device.mock_update()
|
||||
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.OPEN
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 100
|
||||
|
||||
# Call update_position and ensure no changes occur
|
||||
await hass.services.async_call(
|
||||
COVER_DOMAIN,
|
||||
SERVICE_OPEN_COVER,
|
||||
{ATTR_ENTITY_ID: entity_id},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
assert (state := hass.states.get(entity_id))
|
||||
assert state.state == CoverState.OPEN
|
||||
assert state.attributes[ATTR_CURRENT_POSITION] == 100
|
||||
|
||||
Reference in New Issue
Block a user