1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 21:06:19 +00:00

Create zwave_js.invoke_cc_api service (#70466)

This commit is contained in:
Raman Gupta
2022-04-26 11:30:49 -04:00
committed by GitHub
parent f84c33203b
commit 24b090a038
6 changed files with 391 additions and 42 deletions

View File

@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch
import pytest
import voluptuous as vol
from zwave_js_server.exceptions import SetValueFailed
from zwave_js_server.exceptions import FailedZWaveCommand
from homeassistant.components.group import Group
from homeassistant.components.zwave_js.const import (
@@ -12,7 +12,10 @@ from homeassistant.components.zwave_js.const import (
ATTR_CONFIG_PARAMETER,
ATTR_CONFIG_PARAMETER_BITMASK,
ATTR_CONFIG_VALUE,
ATTR_ENDPOINT,
ATTR_METHOD_NAME,
ATTR_OPTIONS,
ATTR_PARAMETERS,
ATTR_PROPERTY,
ATTR_PROPERTY_KEY,
ATTR_REFRESH_ALL_VALUES,
@@ -20,6 +23,7 @@ from homeassistant.components.zwave_js.const import (
ATTR_WAIT_FOR_RESULT,
DOMAIN,
SERVICE_BULK_SET_PARTIAL_CONFIG_PARAMETERS,
SERVICE_INVOKE_CC_API,
SERVICE_MULTICAST_SET_VALUE,
SERVICE_PING,
SERVICE_REFRESH_VALUE,
@@ -28,6 +32,7 @@ from homeassistant.components.zwave_js.const import (
)
from homeassistant.components.zwave_js.helpers import get_device_id
from homeassistant.const import ATTR_AREA_ID, ATTR_DEVICE_ID, ATTR_ENTITY_ID
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.area_registry import async_get as async_get_area_reg
from homeassistant.helpers.device_registry import (
async_entries_for_config_entry,
@@ -982,7 +987,7 @@ async def test_set_value(hass, client, climate_danfoss_lc_13, integration):
# Test that when a command fails we raise an exception
client.async_send_command.return_value = {"success": False}
with pytest.raises(SetValueFailed):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_VALUE,
@@ -1335,7 +1340,7 @@ async def test_multicast_set_value(
# Test that when a command fails we raise an exception
client.async_send_command.return_value = {"success": False}
with pytest.raises(SetValueFailed):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
SERVICE_MULTICAST_SET_VALUE,
@@ -1608,3 +1613,163 @@ async def test_ping(
{},
blocking=True,
)
async def test_invoke_cc_api(
hass,
client,
climate_danfoss_lc_13,
climate_radio_thermostat_ct100_plus_different_endpoints,
integration,
):
"""Test invoke_cc_api service."""
dev_reg = async_get_dev_reg(hass)
device_radio_thermostat = dev_reg.async_get_device(
{get_device_id(client, climate_radio_thermostat_ct100_plus_different_endpoints)}
)
assert device_radio_thermostat
device_danfoss = dev_reg.async_get_device(
{get_device_id(client, climate_danfoss_lc_13)}
)
assert device_danfoss
# Test successful invoke_cc_api call with a static endpoint
client.async_send_command.return_value = {"response": True}
client.async_send_command_no_wait.return_value = {"response": True}
await hass.services.async_call(
DOMAIN,
SERVICE_INVOKE_CC_API,
{
ATTR_DEVICE_ID: [
device_radio_thermostat.id,
device_danfoss.id,
],
ATTR_COMMAND_CLASS: 132,
ATTR_ENDPOINT: 0,
ATTR_METHOD_NAME: "someMethod",
ATTR_PARAMETERS: [1, 2],
},
blocking=True,
)
assert len(client.async_send_command.call_args_list) == 1
args = client.async_send_command.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert (
args["nodeId"]
== climate_radio_thermostat_ct100_plus_different_endpoints.node_id
)
assert len(client.async_send_command_no_wait.call_args_list) == 1
args = client.async_send_command_no_wait.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert args["nodeId"] == climate_danfoss_lc_13.node_id
client.async_send_command.reset_mock()
client.async_send_command_no_wait.reset_mock()
# Test successful invoke_cc_api call without an endpoint (include area)
area_reg = async_get_area_reg(hass)
area = area_reg.async_get_or_create("test")
dev_reg.async_update_device(device_danfoss.id, area_id=area.id)
client.async_send_command.return_value = {"response": True}
client.async_send_command_no_wait.return_value = {"response": True}
await hass.services.async_call(
DOMAIN,
SERVICE_INVOKE_CC_API,
{
ATTR_AREA_ID: area.id,
ATTR_DEVICE_ID: [
device_radio_thermostat.id,
"fake_device_id",
],
ATTR_ENTITY_ID: [
"sensor.not_real",
"select.living_connect_z_thermostat_local_protection_state",
"sensor.living_connect_z_thermostat_node_status",
],
ATTR_COMMAND_CLASS: 132,
ATTR_METHOD_NAME: "someMethod",
ATTR_PARAMETERS: [1, 2],
},
blocking=True,
)
assert len(client.async_send_command.call_args_list) == 1
args = client.async_send_command.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert (
args["nodeId"]
== climate_radio_thermostat_ct100_plus_different_endpoints.node_id
)
assert len(client.async_send_command_no_wait.call_args_list) == 1
args = client.async_send_command_no_wait.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert args["nodeId"] == climate_danfoss_lc_13.node_id
client.async_send_command.reset_mock()
client.async_send_command_no_wait.reset_mock()
# Test failed invoke_cc_api call on one node
client.async_send_command.return_value = {"response": True}
client.async_send_command_no_wait.side_effect = FailedZWaveCommand(
"test", 12, "test"
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
SERVICE_INVOKE_CC_API,
{
ATTR_DEVICE_ID: [
device_radio_thermostat.id,
device_danfoss.id,
],
ATTR_COMMAND_CLASS: 132,
ATTR_ENDPOINT: 0,
ATTR_METHOD_NAME: "someMethod",
ATTR_PARAMETERS: [1, 2],
},
blocking=True,
)
assert len(client.async_send_command.call_args_list) == 1
args = client.async_send_command.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert (
args["nodeId"]
== climate_radio_thermostat_ct100_plus_different_endpoints.node_id
)
assert len(client.async_send_command_no_wait.call_args_list) == 1
args = client.async_send_command_no_wait.call_args[0][0]
assert args["command"] == "endpoint.invoke_cc_api"
assert args["commandClass"] == 132
assert args["endpoint"] == 0
assert args["methodName"] == "someMethod"
assert args["args"] == [1, 2]
assert args["nodeId"] == climate_danfoss_lc_13.node_id
client.async_send_command.reset_mock()
client.async_send_command_no_wait.reset_mock()