1
0
mirror of https://github.com/home-assistant/core.git synced 2026-02-28 13:56:28 +00:00

Add Shelly event translation (#156162)

Signed-off-by: David Rapan <david@rapan.cz>
This commit is contained in:
David Rapan
2025-11-15 19:52:13 +01:00
committed by GitHub
parent 5c0c7b9ec3
commit bdca592219
5 changed files with 145 additions and 39 deletions

View File

@@ -30,7 +30,10 @@ from .entity import ShellyBlockEntity, get_entity_rpc_device_info
from .utils import (
async_remove_orphaned_entities,
async_remove_shelly_entity,
get_block_channel,
get_block_custom_name,
get_device_entry_gen,
get_rpc_component_name,
get_rpc_entity_name,
get_rpc_key_instances,
is_block_momentary_input,
@@ -74,7 +77,6 @@ RPC_EVENT: Final = ShellyRpcEventDescription(
SCRIPT_EVENT: Final = ShellyRpcEventDescription(
key="script",
translation_key="script",
device_class=None,
entity_registry_enabled_default=False,
)
@@ -195,6 +197,17 @@ class ShellyBlockEvent(ShellyBlockEntity, EventEntity):
self._attr_event_types = list(BASIC_INPUTS_EVENTS_TYPES)
self.entity_description = description
if (
hasattr(self, "_attr_name")
and self._attr_name
and not get_block_custom_name(coordinator.device, block)
):
self._attr_translation_placeholders = {
"input_number": get_block_channel(block)
}
delattr(self, "_attr_name")
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
@@ -227,9 +240,20 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity):
self.event_id = int(key.split(":")[-1])
self._attr_device_info = get_entity_rpc_device_info(coordinator, key)
self._attr_unique_id = f"{coordinator.mac}-{key}"
self._attr_name = get_rpc_entity_name(coordinator.device, key)
self.entity_description = description
if description.key == "input":
component = key.split(":")[0]
component_id = key.split(":")[-1]
if not get_rpc_component_name(coordinator.device, key) and (
component.lower() == "input" and component_id.isnumeric()
):
self._attr_translation_placeholders = {"input_number": component_id}
else:
self._attr_name = get_rpc_entity_name(coordinator.device, key)
elif description.key == "script":
self._attr_name = get_rpc_entity_name(coordinator.device, key)
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()

View File

@@ -168,6 +168,7 @@
},
"event": {
"input": {
"name": "Input {input_number}",
"state_attributes": {
"event_type": {
"state": {

View File

@@ -120,17 +120,35 @@ def get_number_of_channels(device: BlockDevice, block: Block) -> int:
def get_block_entity_name(
device: BlockDevice,
block: Block | None,
description: str | UndefinedType | None = None,
name: str | UndefinedType | None = None,
) -> str | None:
"""Naming for block based switch and sensors."""
channel_name = get_block_channel_name(device, block)
if description is not UNDEFINED and description:
return f"{channel_name} {description.lower()}" if channel_name else description
if name is not UNDEFINED and name:
return f"{channel_name} {name.lower()}" if channel_name else name
return channel_name
def get_block_custom_name(device: BlockDevice, block: Block | None) -> str | None:
"""Get custom name from device settings."""
if block and (key := cast(str, block.type) + "s") and key in device.settings:
assert block.channel
if name := device.settings[key][int(block.channel)].get("name"):
return cast(str, name)
return None
def get_block_channel(block: Block | None, base: str = "1") -> str:
"""Get block channel."""
assert block and block.channel
return chr(int(block.channel) + ord(base))
def get_block_channel_name(device: BlockDevice, block: Block | None) -> str | None:
"""Get name based on device and channel name."""
if (
@@ -140,19 +158,10 @@ def get_block_channel_name(device: BlockDevice, block: Block | None) -> str | No
):
return None
assert block.channel
if custom_name := get_block_custom_name(device, block):
return custom_name
channel_name: str | None = None
mode = cast(str, block.type) + "s"
if mode in device.settings:
channel_name = device.settings[mode][int(block.channel)].get("name")
if channel_name:
return channel_name
base = ord("1")
return f"Channel {chr(int(block.channel) + base)}"
return f"Channel {get_block_channel(block)}"
def get_block_sub_device_name(device: BlockDevice, block: Block) -> str:
@@ -160,18 +169,13 @@ def get_block_sub_device_name(device: BlockDevice, block: Block) -> str:
if TYPE_CHECKING:
assert block.channel
mode = cast(str, block.type) + "s"
if mode in device.settings:
if channel_name := device.settings[mode][int(block.channel)].get("name"):
return cast(str, channel_name)
if custom_name := get_block_custom_name(device, block):
return custom_name
if device.settings["device"]["type"] == MODEL_EM3:
base = ord("A")
return f"{device.name} Phase {chr(int(block.channel) + base)}"
return f"{device.name} Phase {get_block_channel(block, 'A')}"
base = ord("1")
return f"{device.name} Channel {chr(int(block.channel) + base)}"
return f"{device.name} Channel {get_block_channel(block)}"
def is_block_momentary_input(
@@ -387,6 +391,18 @@ def get_shelly_model_name(
return cast(str, MODEL_NAMES.get(model))
def get_rpc_component_name(device: RpcDevice, key: str) -> str | None:
"""Get component name from device config."""
if (
key in device.config
and key != "em:0" # workaround for Pro 3EM, we don't want to get name for em:0
and (name := device.config[key].get("name"))
):
return cast(str, name)
return None
def get_rpc_channel_name(device: RpcDevice, key: str) -> str | None:
"""Get name based on device and channel name."""
if BLU_TRV_IDENTIFIER in key:
@@ -398,13 +414,11 @@ def get_rpc_channel_name(device: RpcDevice, key: str) -> str | None:
component = key.split(":")[0]
component_id = key.split(":")[-1]
if key in device.config and key != "em:0":
# workaround for Pro 3EM, we don't want to get name for em:0
if component_name := device.config[key].get("name"):
if component in (*VIRTUAL_COMPONENTS, "input", "presencezone", "script"):
return cast(str, component_name)
if component_name := get_rpc_component_name(device, key):
if component in (*VIRTUAL_COMPONENTS, "input", "presencezone", "script"):
return component_name
return cast(str, component_name) if instances == 1 else None
return component_name if instances == 1 else None
if component in (*VIRTUAL_COMPONENTS, "input"):
return f"{component.title()} {component_id}"

View File

@@ -36,10 +36,28 @@ MOCK_SETTINGS = {
"mac": MOCK_MAC,
"hostname": "test-host",
"type": MODEL_25,
"num_inputs": 3,
"num_outputs": 2,
},
"coiot": {"update_period": 15},
"fw": "20201124-092159/v1.9.0@57ac4ad8",
"inputs": [
{
"name": "TV LEDs",
"btn_type": "momentary",
"btn_reverse": 0,
},
{
"name": "TV Spots",
"btn_type": "momentary",
"btn_reverse": 0,
},
{
"name": None,
"btn_type": "momentary",
"btn_reverse": 0,
},
],
"relays": [{"btn_type": "momentary"}, {"btn_type": "toggle"}],
"rollers": [{"positioning": True}],
"external_power": 0,
@@ -348,6 +366,7 @@ MOCK_SHELLY_COAP = {
"mac": MOCK_MAC,
"auth": False,
"fw": "20210715-092854/v1.11.0@57ac4ad8",
"num_inputs": 3,
"num_outputs": 2,
}

View File

@@ -1,5 +1,6 @@
"""Tests for Shelly button platform."""
import copy
from unittest.mock import Mock
from aioshelly.ble.const import BLE_SCRIPT_NAME
@@ -24,9 +25,14 @@ from . import (
patch_platforms,
register_entity,
)
from .conftest import MOCK_BLOCKS
DEVICE_BLOCK_ID = 4
UNORDERED_EVENT_TYPES = unordered(
["double", "long", "long_single", "single", "single_long", "triple"]
)
@pytest.fixture(autouse=True)
def fixture_platforms():
@@ -213,15 +219,57 @@ async def test_block_event(
assert state.attributes.get(ATTR_EVENT_TYPE) == "long"
async def test_block_event_single_output(
hass: HomeAssistant, mock_block_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test block device event when num_outputs is 1."""
monkeypatch.setitem(mock_block_device.shelly, "num_outputs", 1)
await init_integration(hass, 1)
assert hass.states.get("event.test_name")
async def test_block_event_shix3_1(
hass: HomeAssistant, mock_block_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test block device event for SHIX3-1."""
monkeypatch.setitem(mock_block_device.shelly, "num_outputs", 1)
await init_integration(hass, 1, model=MODEL_I3)
entity_id = "event.test_name"
assert (state := hass.states.get(entity_id))
assert state.attributes.get(ATTR_EVENT_TYPES) == unordered(
["double", "long", "long_single", "single", "single_long", "triple"]
blocks = copy.deepcopy(MOCK_BLOCKS)
blocks[0] = Mock(
sensor_ids={
"inputEvent": "S",
"inputEventCnt": 2,
},
channel="0",
type="input",
description="input_0",
)
blocks[1] = Mock(
sensor_ids={
"inputEvent": "S",
"inputEventCnt": 2,
},
channel="1",
type="input",
description="input_1",
)
blocks[2] = Mock(
sensor_ids={
"inputEvent": "S",
"inputEventCnt": 2,
},
channel="2",
type="input",
description="input_2",
)
monkeypatch.setattr(mock_block_device, "blocks", blocks)
monkeypatch.delitem(mock_block_device.settings, "relays")
await init_integration(hass, 1, model=MODEL_I3)
assert (state := hass.states.get("event.test_name_tv_leds"))
assert state.attributes.get(ATTR_EVENT_TYPES) == UNORDERED_EVENT_TYPES
assert (state := hass.states.get("event.test_name_tv_spots"))
assert state.attributes.get(ATTR_EVENT_TYPES) == UNORDERED_EVENT_TYPES
assert (state := hass.states.get("event.test_name_input_3"))
assert state.attributes.get(ATTR_EVENT_TYPES) == UNORDERED_EVENT_TYPES