mirror of
https://github.com/home-assistant/core.git
synced 2025-12-26 22:18:40 +00:00
Add ZHA roller shadows as cover instead of switch (#36059)
* Implement cover for "Shade" ZHA device type. * Update ZHA cover tests. * Add stop command * Coverage.
This commit is contained in:
@@ -1,11 +1,22 @@
|
||||
"""Test zha cover."""
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
import zigpy.types
|
||||
import zigpy.zcl.clusters.closures as closures
|
||||
import zigpy.zcl.clusters.general as general
|
||||
import zigpy.zcl.foundation as zcl_f
|
||||
|
||||
from homeassistant.components.cover import DOMAIN
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
DOMAIN,
|
||||
SERVICE_CLOSE_COVER,
|
||||
SERVICE_OPEN_COVER,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
SERVICE_STOP_COVER,
|
||||
)
|
||||
from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
|
||||
from homeassistant.core import CoreState, State
|
||||
|
||||
from .common import (
|
||||
async_enable_traffic,
|
||||
@@ -14,8 +25,8 @@ from .common import (
|
||||
send_attributes_report,
|
||||
)
|
||||
|
||||
from tests.async_mock import MagicMock, call, patch
|
||||
from tests.common import mock_coro
|
||||
from tests.async_mock import AsyncMock, MagicMock, call, patch
|
||||
from tests.common import mock_coro, mock_restore_cache
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -32,6 +43,24 @@ def zigpy_cover_device(zigpy_device_mock):
|
||||
return zigpy_device_mock(endpoints)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def zigpy_shade_device(zigpy_device_mock):
|
||||
"""Zigpy shade device."""
|
||||
|
||||
endpoints = {
|
||||
1: {
|
||||
"device_type": 512,
|
||||
"in_clusters": [
|
||||
closures.Shade.cluster_id,
|
||||
general.LevelControl.cluster_id,
|
||||
general.OnOff.cluster_id,
|
||||
],
|
||||
"out_clusters": [],
|
||||
}
|
||||
}
|
||||
return zigpy_device_mock(endpoints)
|
||||
|
||||
|
||||
@patch(
|
||||
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
|
||||
)
|
||||
@@ -74,7 +103,7 @@ async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True
|
||||
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster.request.call_count == 1
|
||||
assert cluster.request.call_args == call(
|
||||
@@ -86,7 +115,7 @@ async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True
|
||||
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster.request.call_count == 1
|
||||
assert cluster.request.call_args == call(
|
||||
@@ -99,7 +128,7 @@ async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
"set_cover_position",
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
{"entity_id": entity_id, "position": 47},
|
||||
blocking=True,
|
||||
)
|
||||
@@ -119,7 +148,7 @@ async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True
|
||||
DOMAIN, SERVICE_STOP_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster.request.call_count == 1
|
||||
assert cluster.request.call_args == call(
|
||||
@@ -129,3 +158,151 @@ async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
# test rejoin
|
||||
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
|
||||
async def test_shade(hass, zha_device_joined_restored, zigpy_shade_device):
|
||||
"""Test zha cover platform for shade device type."""
|
||||
|
||||
# load up cover domain
|
||||
zha_device = await zha_device_joined_restored(zigpy_shade_device)
|
||||
|
||||
cluster_on_off = zigpy_shade_device.endpoints.get(1).on_off
|
||||
cluster_level = zigpy_shade_device.endpoints.get(1).level
|
||||
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
# test that the cover was created and that it is unavailable
|
||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||
|
||||
# allow traffic to flow through the gateway and device
|
||||
await async_enable_traffic(hass, [zha_device])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# test that the state has changed from unavailable to off
|
||||
await send_attributes_report(hass, cluster_on_off, {8: 0, 0: False, 1: 1})
|
||||
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||
|
||||
# test to see if it opens
|
||||
await send_attributes_report(hass, cluster_on_off, {8: 0, 0: True, 1: 1})
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
# close from UI command fails
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0000
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x1, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0000
|
||||
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||
|
||||
# open from UI command fails
|
||||
assert ATTR_CURRENT_POSITION not in hass.states.get(entity_id).attributes
|
||||
await send_attributes_report(hass, cluster_level, {0: 0})
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0001
|
||||
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||
|
||||
# open from UI succeeds
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x0, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert cluster_on_off.request.call_count == 1
|
||||
assert cluster_on_off.request.call_args[0][0] is False
|
||||
assert cluster_on_off.request.call_args[0][1] == 0x0001
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
# set position UI command fails
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
{"entity_id": entity_id, "position": 47},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 1
|
||||
assert cluster_level.request.call_args[0][0] is False
|
||||
assert cluster_level.request.call_args[0][1] == 0x0004
|
||||
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47
|
||||
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 0
|
||||
|
||||
# set position UI success
|
||||
with patch(
|
||||
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x5, zcl_f.Status.SUCCESS])
|
||||
):
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_SET_COVER_POSITION,
|
||||
{"entity_id": entity_id, "position": 47},
|
||||
blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 1
|
||||
assert cluster_level.request.call_args[0][0] is False
|
||||
assert cluster_level.request.call_args[0][1] == 0x0004
|
||||
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47
|
||||
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 47
|
||||
|
||||
# report position change
|
||||
await send_attributes_report(hass, cluster_level, {8: 0, 0: 100, 1: 1})
|
||||
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == int(
|
||||
100 * 100 / 255
|
||||
)
|
||||
|
||||
# test rejoin
|
||||
await async_test_rejoin(
|
||||
hass, zigpy_shade_device, [cluster_level, cluster_on_off], (1,)
|
||||
)
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
|
||||
# test cover stop
|
||||
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
|
||||
await hass.services.async_call(
|
||||
DOMAIN, SERVICE_STOP_COVER, {"entity_id": entity_id}, blocking=True,
|
||||
)
|
||||
assert cluster_level.request.call_count == 1
|
||||
assert cluster_level.request.call_args[0][0] is False
|
||||
assert cluster_level.request.call_args[0][1] in (0x0003, 0x0007)
|
||||
|
||||
|
||||
async def test_restore_state(hass, zha_device_restored, zigpy_shade_device):
|
||||
"""Ensure states are restored on startup."""
|
||||
|
||||
mock_restore_cache(
|
||||
hass,
|
||||
(
|
||||
State(
|
||||
"cover.fakemanufacturer_fakemodel_e769900a_level_on_off_shade",
|
||||
STATE_OPEN,
|
||||
{ATTR_CURRENT_POSITION: 50},
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
hass.state = CoreState.starting
|
||||
|
||||
zha_device = await zha_device_restored(zigpy_shade_device)
|
||||
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
# test that the cover was created and that it is unavailable
|
||||
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50
|
||||
|
||||
Reference in New Issue
Block a user