mirror of
https://github.com/home-assistant/core.git
synced 2025-12-25 05:26:47 +00:00
Allow sending GroupValueResponse telegrams with knx.send service (#62639)
* Add knx.respond service * Combine knx.send and knx.respond services * Rename knx attribute and fix tests * Use parametrization in tests Co-authored-by: Marvin Wichmann <marvin.wichmann@unic.com>
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
"""Test KNX services."""
|
||||
import pytest
|
||||
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
|
||||
|
||||
from homeassistant.const import STATE_OFF, STATE_ON
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
@@ -7,51 +10,114 @@ from .conftest import KNXTestKit
|
||||
from tests.common import async_capture_events
|
||||
|
||||
|
||||
async def test_send(hass: HomeAssistant, knx: KNXTestKit):
|
||||
@pytest.mark.parametrize(
|
||||
"service_payload,expected_telegrams,expected_apci",
|
||||
[
|
||||
# send DPT 1 telegram
|
||||
(
|
||||
{"address": "1/2/3", "payload": True, "response": True},
|
||||
[("1/2/3", True)],
|
||||
GroupValueResponse,
|
||||
),
|
||||
(
|
||||
{"address": "1/2/3", "payload": True, "response": False},
|
||||
[("1/2/3", True)],
|
||||
GroupValueWrite,
|
||||
),
|
||||
# send DPT 5 telegram
|
||||
(
|
||||
{"address": "1/2/3", "payload": [99], "response": True},
|
||||
[("1/2/3", (99,))],
|
||||
GroupValueResponse,
|
||||
),
|
||||
(
|
||||
{"address": "1/2/3", "payload": [99], "response": False},
|
||||
[("1/2/3", (99,))],
|
||||
GroupValueWrite,
|
||||
),
|
||||
# send DPT 5 percent telegram
|
||||
(
|
||||
{"address": "1/2/3", "payload": 99, "type": "percent", "response": True},
|
||||
[("1/2/3", (0xFC,))],
|
||||
GroupValueResponse,
|
||||
),
|
||||
(
|
||||
{"address": "1/2/3", "payload": 99, "type": "percent", "response": False},
|
||||
[("1/2/3", (0xFC,))],
|
||||
GroupValueWrite,
|
||||
),
|
||||
# send temperature DPT 9 telegram
|
||||
(
|
||||
{
|
||||
"address": "1/2/3",
|
||||
"payload": 21.0,
|
||||
"type": "temperature",
|
||||
"response": True,
|
||||
},
|
||||
[("1/2/3", (0x0C, 0x1A))],
|
||||
GroupValueResponse,
|
||||
),
|
||||
(
|
||||
{
|
||||
"address": "1/2/3",
|
||||
"payload": 21.0,
|
||||
"type": "temperature",
|
||||
"response": False,
|
||||
},
|
||||
[("1/2/3", (0x0C, 0x1A))],
|
||||
GroupValueWrite,
|
||||
),
|
||||
# send multiple telegrams
|
||||
(
|
||||
{
|
||||
"address": ["1/2/3", "2/2/2", "3/3/3"],
|
||||
"payload": 99,
|
||||
"type": "percent",
|
||||
"response": True,
|
||||
},
|
||||
[
|
||||
("1/2/3", (0xFC,)),
|
||||
("2/2/2", (0xFC,)),
|
||||
("3/3/3", (0xFC,)),
|
||||
],
|
||||
GroupValueResponse,
|
||||
),
|
||||
(
|
||||
{
|
||||
"address": ["1/2/3", "2/2/2", "3/3/3"],
|
||||
"payload": 99,
|
||||
"type": "percent",
|
||||
"response": False,
|
||||
},
|
||||
[
|
||||
("1/2/3", (0xFC,)),
|
||||
("2/2/2", (0xFC,)),
|
||||
("3/3/3", (0xFC,)),
|
||||
],
|
||||
GroupValueWrite,
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_send(
|
||||
hass: HomeAssistant,
|
||||
knx: KNXTestKit,
|
||||
service_payload,
|
||||
expected_telegrams,
|
||||
expected_apci,
|
||||
):
|
||||
"""Test `knx.send` service."""
|
||||
test_address = "1/2/3"
|
||||
await knx.setup_integration({})
|
||||
|
||||
# send DPT 1 telegram
|
||||
await hass.services.async_call(
|
||||
"knx", "send", {"address": test_address, "payload": True}, blocking=True
|
||||
)
|
||||
await knx.assert_write(test_address, True)
|
||||
|
||||
# send raw DPT 5 telegram
|
||||
await hass.services.async_call(
|
||||
"knx", "send", {"address": test_address, "payload": [99]}, blocking=True
|
||||
)
|
||||
await knx.assert_write(test_address, (99,))
|
||||
|
||||
# send "percent" DPT 5 telegram
|
||||
await hass.services.async_call(
|
||||
"knx",
|
||||
"send",
|
||||
{"address": test_address, "payload": 99, "type": "percent"},
|
||||
service_payload,
|
||||
blocking=True,
|
||||
)
|
||||
await knx.assert_write(test_address, (0xFC,))
|
||||
|
||||
# send "temperature" DPT 9 telegram
|
||||
await hass.services.async_call(
|
||||
"knx",
|
||||
"send",
|
||||
{"address": test_address, "payload": 21.0, "type": "temperature"},
|
||||
blocking=True,
|
||||
)
|
||||
await knx.assert_write(test_address, (0x0C, 0x1A))
|
||||
|
||||
# send multiple telegrams
|
||||
await hass.services.async_call(
|
||||
"knx",
|
||||
"send",
|
||||
{"address": [test_address, "2/2/2", "3/3/3"], "payload": 99, "type": "percent"},
|
||||
blocking=True,
|
||||
)
|
||||
await knx.assert_write(test_address, (0xFC,))
|
||||
await knx.assert_write("2/2/2", (0xFC,))
|
||||
await knx.assert_write("3/3/3", (0xFC,))
|
||||
for expected_response in expected_telegrams:
|
||||
group_address, payload = expected_response
|
||||
await knx.assert_telegram(group_address, payload, expected_apci)
|
||||
|
||||
|
||||
async def test_read(hass: HomeAssistant, knx: KNXTestKit):
|
||||
|
||||
Reference in New Issue
Block a user