mirror of
https://github.com/home-assistant/core.git
synced 2026-04-17 15:44:52 +01:00
Replace enocean library (#164272)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
"""Support for EnOcean devices."""
|
||||
|
||||
from serial import SerialException
|
||||
from enocean_async import Gateway
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||
@@ -8,12 +8,15 @@ from homeassistant.const import CONF_DEVICE
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.dispatcher import (
|
||||
async_dispatcher_connect,
|
||||
async_dispatcher_send,
|
||||
)
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import DOMAIN
|
||||
from .dongle import EnOceanDongle
|
||||
from .const import DOMAIN, SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
|
||||
|
||||
type EnOceanConfigEntry = ConfigEntry[EnOceanDongle]
|
||||
type EnOceanConfigEntry = ConfigEntry[Gateway]
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
{DOMAIN: vol.Schema({vol.Required(CONF_DEVICE): cv.string})}, extra=vol.ALLOW_EXTRA
|
||||
@@ -27,7 +30,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
return True
|
||||
|
||||
if hass.config_entries.async_entries(DOMAIN):
|
||||
# We can only have one dongle. If there is already one in the config,
|
||||
# We can only have one gateway. If there is already one in the config,
|
||||
# there is no need to import the yaml based config.
|
||||
return True
|
||||
|
||||
@@ -43,23 +46,31 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant, config_entry: EnOceanConfigEntry
|
||||
) -> bool:
|
||||
"""Set up an EnOcean dongle for the given entry."""
|
||||
try:
|
||||
usb_dongle = EnOceanDongle(hass, config_entry.data[CONF_DEVICE])
|
||||
except SerialException as err:
|
||||
raise ConfigEntryNotReady(f"Failed to set up EnOcean dongle: {err}") from err
|
||||
await usb_dongle.async_setup()
|
||||
config_entry.runtime_data = usb_dongle
|
||||
"""Set up an EnOcean gateway for the given entry."""
|
||||
gateway = Gateway(port=config_entry.data[CONF_DEVICE])
|
||||
|
||||
gateway.add_erp1_received_callback(
|
||||
lambda packet: async_dispatcher_send(hass, SIGNAL_RECEIVE_MESSAGE, packet)
|
||||
)
|
||||
|
||||
try:
|
||||
await gateway.start()
|
||||
except ConnectionError as err:
|
||||
gateway.stop()
|
||||
raise ConfigEntryNotReady(f"Failed to start EnOcean gateway: {err}") from err
|
||||
|
||||
config_entry.runtime_data = gateway
|
||||
|
||||
config_entry.async_on_unload(
|
||||
async_dispatcher_connect(hass, SIGNAL_SEND_MESSAGE, gateway.send_esp3_packet)
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(
|
||||
hass: HomeAssistant, config_entry: EnOceanConfigEntry
|
||||
) -> bool:
|
||||
"""Unload EnOcean config entry."""
|
||||
|
||||
enocean_dongle = config_entry.runtime_data
|
||||
enocean_dongle.unload()
|
||||
"""Unload EnOcean config entry: stop the gateway."""
|
||||
|
||||
config_entry.runtime_data.stop()
|
||||
return True
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enocean.utils import combine_hex
|
||||
from enocean_async import ERP1Telegram
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.binary_sensor import (
|
||||
@@ -17,7 +17,7 @@ from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .entity import EnOceanEntity
|
||||
from .entity import EnOceanEntity, combine_hex
|
||||
|
||||
DEFAULT_NAME = "EnOcean binary sensor"
|
||||
DEPENDENCIES = ["enocean"]
|
||||
@@ -68,29 +68,25 @@ class EnOceanBinarySensor(EnOceanEntity, BinarySensorEntity):
|
||||
self._attr_unique_id = f"{combine_hex(dev_id)}-{device_class}"
|
||||
self._attr_name = dev_name
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Fire an event with the data that have changed.
|
||||
|
||||
This method is called when there is an incoming packet associated
|
||||
with this platform.
|
||||
|
||||
Example packet data:
|
||||
- 2nd button pressed
|
||||
['0xf6', '0x10', '0x00', '0x2d', '0xcf', '0x45', '0x30']
|
||||
- button released
|
||||
['0xf6', '0x00', '0x00', '0x2d', '0xcf', '0x45', '0x20']
|
||||
"""
|
||||
if not self.address:
|
||||
return
|
||||
# Energy Bow
|
||||
pushed = None
|
||||
|
||||
if packet.data[6] == 0x30:
|
||||
if telegram.status == 0x30:
|
||||
pushed = 1
|
||||
elif packet.data[6] == 0x20:
|
||||
elif telegram.status == 0x20:
|
||||
pushed = 0
|
||||
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
action = packet.data[1]
|
||||
action = telegram.telegram_data[0]
|
||||
if action == 0x70:
|
||||
self.which = 0
|
||||
self.onoff = 0
|
||||
@@ -112,7 +108,7 @@ class EnOceanBinarySensor(EnOceanEntity, BinarySensorEntity):
|
||||
self.hass.bus.fire(
|
||||
EVENT_BUTTON_PRESSED,
|
||||
{
|
||||
"id": self.dev_id,
|
||||
"id": self.address.to_bytelist(),
|
||||
"pushed": pushed,
|
||||
"which": self.which,
|
||||
"onoff": self.onoff,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
"""Config flows for the EnOcean integration."""
|
||||
|
||||
import glob
|
||||
from typing import Any
|
||||
|
||||
from enocean_async import Gateway
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import usb
|
||||
@@ -19,7 +21,6 @@ from homeassistant.helpers.selector import (
|
||||
)
|
||||
from homeassistant.helpers.service_info.usb import UsbServiceInfo
|
||||
|
||||
from . import dongle
|
||||
from .const import DOMAIN, ERROR_INVALID_DONGLE_PATH, LOGGER, MANUFACTURER
|
||||
|
||||
MANUAL_SCHEMA = vol.Schema(
|
||||
@@ -29,6 +30,24 @@ MANUAL_SCHEMA = vol.Schema(
|
||||
)
|
||||
|
||||
|
||||
def _detect_usb_dongle() -> list[str]:
|
||||
"""Return a list of candidate paths for USB EnOcean dongles.
|
||||
|
||||
This method is currently a bit simplistic, it may need to be
|
||||
improved to support more configurations and OS.
|
||||
"""
|
||||
globs_to_test = [
|
||||
"/dev/tty*FTOA2PV*",
|
||||
"/dev/serial/by-id/*EnOcean*",
|
||||
"/dev/tty.usbserial-*",
|
||||
]
|
||||
found_paths = []
|
||||
for current_glob in globs_to_test:
|
||||
found_paths.extend(glob.glob(current_glob))
|
||||
|
||||
return found_paths
|
||||
|
||||
|
||||
class EnOceanFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle the enOcean config flows."""
|
||||
|
||||
@@ -107,7 +126,7 @@ class EnOceanFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
return await self.async_step_manual()
|
||||
return await self.async_step_manual(user_input)
|
||||
|
||||
devices = await self.hass.async_add_executor_job(dongle.detect)
|
||||
devices = await self.hass.async_add_executor_job(_detect_usb_dongle)
|
||||
if len(devices) == 0:
|
||||
return await self.async_step_manual()
|
||||
devices.append(self.MANUAL_PATH_VALUE)
|
||||
@@ -146,7 +165,17 @@ class EnOceanFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
async def validate_enocean_conf(self, user_input) -> bool:
|
||||
"""Return True if the user_input contains a valid dongle path."""
|
||||
dongle_path = user_input[CONF_DEVICE]
|
||||
return await self.hass.async_add_executor_job(dongle.validate_path, dongle_path)
|
||||
try:
|
||||
# Starting the gateway will raise an exception if it can't connect
|
||||
gateway = Gateway(port=dongle_path)
|
||||
await gateway.start()
|
||||
except ConnectionError as exception:
|
||||
LOGGER.warning("Dongle path %s is invalid: %s", dongle_path, str(exception))
|
||||
return False
|
||||
finally:
|
||||
gateway.stop()
|
||||
|
||||
return True
|
||||
|
||||
def create_enocean_entry(self, user_input):
|
||||
"""Create an entry for the provided configuration."""
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
"""Representation of an EnOcean dongle."""
|
||||
|
||||
import glob
|
||||
import logging
|
||||
from os.path import basename, normpath
|
||||
|
||||
from enocean.communicators import SerialCommunicator
|
||||
from enocean.protocol.packet import RadioPacket
|
||||
import serial
|
||||
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send
|
||||
|
||||
from .const import SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EnOceanDongle:
|
||||
"""Representation of an EnOcean dongle.
|
||||
|
||||
The dongle is responsible for receiving the EnOcean frames,
|
||||
creating devices if needed, and dispatching messages to platforms.
|
||||
"""
|
||||
|
||||
def __init__(self, hass, serial_path):
|
||||
"""Initialize the EnOcean dongle."""
|
||||
|
||||
self._communicator = SerialCommunicator(
|
||||
port=serial_path, callback=self.callback
|
||||
)
|
||||
self.serial_path = serial_path
|
||||
self.identifier = basename(normpath(serial_path))
|
||||
self.hass = hass
|
||||
self.dispatcher_disconnect_handle = None
|
||||
|
||||
async def async_setup(self):
|
||||
"""Finish the setup of the bridge and supported platforms."""
|
||||
self._communicator.start()
|
||||
self.dispatcher_disconnect_handle = async_dispatcher_connect(
|
||||
self.hass, SIGNAL_SEND_MESSAGE, self._send_message_callback
|
||||
)
|
||||
|
||||
def unload(self):
|
||||
"""Disconnect callbacks established at init time."""
|
||||
if self.dispatcher_disconnect_handle:
|
||||
self.dispatcher_disconnect_handle()
|
||||
self.dispatcher_disconnect_handle = None
|
||||
|
||||
def _send_message_callback(self, command):
|
||||
"""Send a command through the EnOcean dongle."""
|
||||
self._communicator.send(command)
|
||||
|
||||
def callback(self, packet):
|
||||
"""Handle EnOcean device's callback.
|
||||
|
||||
This is the callback function called by python-enocean whenever there
|
||||
is an incoming packet.
|
||||
"""
|
||||
|
||||
if isinstance(packet, RadioPacket):
|
||||
_LOGGER.debug("Received radio packet: %s", packet)
|
||||
dispatcher_send(self.hass, SIGNAL_RECEIVE_MESSAGE, packet)
|
||||
|
||||
|
||||
def detect():
|
||||
"""Return a list of candidate paths for USB EnOcean dongles.
|
||||
|
||||
This method is currently a bit simplistic, it may need to be
|
||||
improved to support more configurations and OS.
|
||||
"""
|
||||
globs_to_test = ["/dev/tty*FTOA2PV*", "/dev/serial/by-id/*EnOcean*"]
|
||||
found_paths = []
|
||||
for current_glob in globs_to_test:
|
||||
found_paths.extend(glob.glob(current_glob))
|
||||
|
||||
return found_paths
|
||||
|
||||
|
||||
def validate_path(path: str):
|
||||
"""Return True if the provided path points to a valid serial port, False otherwise."""
|
||||
try:
|
||||
# Creating the serial communicator will raise an exception
|
||||
# if it cannot connect
|
||||
SerialCommunicator(port=path)
|
||||
except serial.SerialException as exception:
|
||||
_LOGGER.warning("Dongle path %s is invalid: %s", path, str(exception))
|
||||
return False
|
||||
return True
|
||||
@@ -1,12 +1,23 @@
|
||||
"""Representation of an EnOcean device."""
|
||||
|
||||
from enocean.protocol.packet import Packet
|
||||
from enocean.utils import combine_hex
|
||||
from enocean_async import EURID, Address, BaseAddress, ERP1Telegram, SenderAddress
|
||||
from enocean_async.esp3.packet import ESP3Packet, ESP3PacketType
|
||||
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send
|
||||
from homeassistant.helpers.entity import Entity
|
||||
|
||||
from .const import SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
|
||||
from .const import LOGGER, SIGNAL_RECEIVE_MESSAGE, SIGNAL_SEND_MESSAGE
|
||||
|
||||
|
||||
def combine_hex(dev_id: list[int]) -> int:
|
||||
"""Combine list of integer values to one big integer.
|
||||
|
||||
This function replaces the previously used function from the enocean library and is considered tech debt that will have to be replaced.
|
||||
"""
|
||||
value = 0
|
||||
for byte in dev_id:
|
||||
value = (value << 8) | (byte & 0xFF)
|
||||
return value
|
||||
|
||||
|
||||
class EnOceanEntity(Entity):
|
||||
@@ -14,7 +25,16 @@ class EnOceanEntity(Entity):
|
||||
|
||||
def __init__(self, dev_id: list[int]) -> None:
|
||||
"""Initialize the device."""
|
||||
self.dev_id = dev_id
|
||||
self.address: SenderAddress | None = None
|
||||
|
||||
try:
|
||||
address = Address.from_bytelist(dev_id)
|
||||
if address.is_eurid():
|
||||
self.address = EURID.from_number(address.to_number())
|
||||
elif address.is_base_address():
|
||||
self.address = BaseAddress.from_number(address.to_number())
|
||||
except ValueError:
|
||||
self.address = None
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Register callbacks."""
|
||||
@@ -24,17 +44,25 @@ class EnOceanEntity(Entity):
|
||||
)
|
||||
)
|
||||
|
||||
def _message_received_callback(self, packet):
|
||||
def _message_received_callback(self, telegram: ERP1Telegram) -> None:
|
||||
"""Handle incoming packets."""
|
||||
if not self.address:
|
||||
return
|
||||
|
||||
if packet.sender_int == combine_hex(self.dev_id):
|
||||
self.value_changed(packet)
|
||||
if telegram.sender == self.address:
|
||||
self.value_changed(telegram)
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the device when a packet arrives."""
|
||||
|
||||
def send_command(self, data, optional, packet_type):
|
||||
"""Send a command via the EnOcean dongle."""
|
||||
|
||||
packet = Packet(packet_type, data=data, optional=optional)
|
||||
dispatcher_send(self.hass, SIGNAL_SEND_MESSAGE, packet)
|
||||
def send_command(
|
||||
self, data: list[int], optional: list[int], packet_type: ESP3PacketType
|
||||
) -> None:
|
||||
"""Send a command via the EnOcean dongle, if data and optional are valid bytes; otherwise, ignore."""
|
||||
try:
|
||||
packet = ESP3Packet(packet_type, data=bytes(data), optional=bytes(optional))
|
||||
dispatcher_send(self.hass, SIGNAL_SEND_MESSAGE, packet)
|
||||
except ValueError as err:
|
||||
LOGGER.warning(
|
||||
"Failed to send command: invalid data or optional bytes: %s", err
|
||||
)
|
||||
|
||||
@@ -5,7 +5,8 @@ from __future__ import annotations
|
||||
import math
|
||||
from typing import Any
|
||||
|
||||
from enocean.utils import combine_hex
|
||||
from enocean_async import ERP1Telegram
|
||||
from enocean_async.esp3.packet import ESP3PacketType
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.light import (
|
||||
@@ -20,7 +21,7 @@ from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .entity import EnOceanEntity
|
||||
from .entity import EnOceanEntity, combine_hex
|
||||
|
||||
CONF_SENDER_ID = "sender_id"
|
||||
|
||||
@@ -75,7 +76,8 @@ class EnOceanLight(EnOceanEntity, LightEntity):
|
||||
command = [0xA5, 0x02, bval, 0x01, 0x09]
|
||||
command.extend(self._sender_id)
|
||||
command.extend([0x00])
|
||||
self.send_command(command, [], 0x01)
|
||||
packet_type = ESP3PacketType(0x01)
|
||||
self.send_command(command, [], packet_type)
|
||||
self._attr_is_on = True
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
@@ -83,17 +85,18 @@ class EnOceanLight(EnOceanEntity, LightEntity):
|
||||
command = [0xA5, 0x02, 0x00, 0x01, 0x09]
|
||||
command.extend(self._sender_id)
|
||||
command.extend([0x00])
|
||||
self.send_command(command, [], 0x01)
|
||||
packet_type = ESP3PacketType(0x01)
|
||||
self.send_command(command, [], packet_type)
|
||||
self._attr_is_on = False
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of this device.
|
||||
|
||||
Dimmer devices like Eltako FUD61 send telegram in different RORGs.
|
||||
We only care about the 4BS (0xA5).
|
||||
"""
|
||||
if packet.data[0] == 0xA5 and packet.data[1] == 0x02:
|
||||
val = packet.data[2]
|
||||
if telegram.rorg == 0xA5 and telegram.telegram_data[0] == 0x02:
|
||||
val = telegram.telegram_data[1]
|
||||
self._attr_brightness = math.floor(val / 100.0 * 256.0)
|
||||
self._attr_is_on = bool(val != 0)
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
"documentation": "https://www.home-assistant.io/integrations/enocean",
|
||||
"integration_type": "hub",
|
||||
"iot_class": "local_push",
|
||||
"loggers": ["enocean"],
|
||||
"requirements": ["enocean==0.50"],
|
||||
"loggers": ["enocean_async"],
|
||||
"requirements": ["enocean-async==0.4.1"],
|
||||
"single_config_entry": true,
|
||||
"usb": [
|
||||
{
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
|
||||
from enocean.utils import combine_hex
|
||||
from enocean_async import EEP, EEP_SPECIFICATIONS, EEPHandler, EEPMessage, ERP1Telegram
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
@@ -30,7 +30,7 @@ from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .entity import EnOceanEntity
|
||||
from .entity import EnOceanEntity, combine_hex
|
||||
|
||||
CONF_MAX_TEMP = "max_temp"
|
||||
CONF_MIN_TEMP = "min_temp"
|
||||
@@ -166,7 +166,7 @@ class EnOceanSensor(EnOceanEntity, RestoreSensor):
|
||||
if (sensor_data := await self.async_get_last_sensor_data()) is not None:
|
||||
self._attr_native_value = sensor_data.native_value
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the sensor."""
|
||||
|
||||
|
||||
@@ -177,15 +177,19 @@ class EnOceanPowerSensor(EnOceanSensor):
|
||||
- A5-12-01 (Automated Meter Reading, Electricity)
|
||||
"""
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the sensor."""
|
||||
if packet.rorg != 0xA5:
|
||||
if telegram.rorg != 0xA5:
|
||||
return
|
||||
packet.parse_eep(0x12, 0x01)
|
||||
if packet.parsed["DT"]["raw_value"] == 1:
|
||||
|
||||
if (eep := EEP_SPECIFICATIONS.get(EEP(0xA5, 0x12, 0x01))) is None:
|
||||
return
|
||||
msg: EEPMessage = EEPHandler(eep).decode(telegram)
|
||||
|
||||
if "DT" in msg.values and msg.values["DT"].raw == 1:
|
||||
# this packet reports the current value
|
||||
raw_val = packet.parsed["MR"]["raw_value"]
|
||||
divisor = packet.parsed["DIV"]["raw_value"]
|
||||
raw_val = msg.values["MR"].raw
|
||||
divisor = msg.values["DIV"].raw
|
||||
self._attr_native_value = raw_val / (10**divisor)
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
@@ -226,13 +230,13 @@ class EnOceanTemperatureSensor(EnOceanSensor):
|
||||
self.range_from = range_from
|
||||
self.range_to = range_to
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the sensor."""
|
||||
if packet.data[0] != 0xA5:
|
||||
if telegram.rorg != 0xA5:
|
||||
return
|
||||
temp_scale = self._scale_max - self._scale_min
|
||||
temp_range = self.range_to - self.range_from
|
||||
raw_val = packet.data[3]
|
||||
raw_val = telegram.telegram_data[2]
|
||||
temperature = temp_scale / temp_range * (raw_val - self.range_from)
|
||||
temperature += self._scale_min
|
||||
self._attr_native_value = round(temperature, 1)
|
||||
@@ -248,11 +252,11 @@ class EnOceanHumiditySensor(EnOceanSensor):
|
||||
- A5-10-10 to A5-10-14 (Room Operating Panels)
|
||||
"""
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the sensor."""
|
||||
if packet.rorg != 0xA5:
|
||||
if telegram.rorg != 0xA5:
|
||||
return
|
||||
humidity = packet.data[2] * 100 / 250
|
||||
humidity = telegram.telegram_data[1] * 100 / 250
|
||||
self._attr_native_value = round(humidity, 1)
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
@@ -264,9 +268,9 @@ class EnOceanWindowHandle(EnOceanSensor):
|
||||
- F6-10-00 (Mechanical handle / Hoppe AG)
|
||||
"""
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the sensor."""
|
||||
action = (packet.data[1] & 0x70) >> 4
|
||||
action = (telegram.telegram_data[0] & 0x70) >> 4
|
||||
|
||||
if action == 0x07:
|
||||
self._attr_native_value = STATE_CLOSED
|
||||
|
||||
@@ -4,7 +4,8 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from enocean.utils import combine_hex
|
||||
from enocean_async import EEP, EEP_SPECIFICATIONS, EEPHandler, EEPMessage, ERP1Telegram
|
||||
from enocean_async.esp3.packet import ESP3PacketType
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.switch import (
|
||||
@@ -18,7 +19,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .entity import EnOceanEntity
|
||||
from .entity import EnOceanEntity, combine_hex
|
||||
|
||||
CONF_CHANNEL = "channel"
|
||||
DEFAULT_NAME = "EnOcean Switch"
|
||||
@@ -86,52 +87,68 @@ class EnOceanSwitch(EnOceanEntity, SwitchEntity):
|
||||
"""Initialize the EnOcean switch device."""
|
||||
super().__init__(dev_id)
|
||||
self._light = None
|
||||
self.channel = channel
|
||||
self.channel: int = channel
|
||||
self._attr_unique_id = generate_unique_id(dev_id, channel)
|
||||
self._attr_name = dev_name
|
||||
|
||||
def turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn on the switch."""
|
||||
if not self.address:
|
||||
return
|
||||
|
||||
optional = [0x03]
|
||||
optional.extend(self.dev_id)
|
||||
optional.extend(self.address.to_bytelist())
|
||||
optional.extend([0xFF, 0x00])
|
||||
self.send_command(
|
||||
data=[0xD2, 0x01, self.channel & 0xFF, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00],
|
||||
optional=optional,
|
||||
packet_type=0x01,
|
||||
packet_type=ESP3PacketType(0x01),
|
||||
)
|
||||
self._attr_is_on = True
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn off the switch."""
|
||||
if not self.address:
|
||||
return
|
||||
optional = [0x03]
|
||||
optional.extend(self.dev_id)
|
||||
optional.extend(self.address.to_bytelist())
|
||||
optional.extend([0xFF, 0x00])
|
||||
self.send_command(
|
||||
data=[0xD2, 0x01, self.channel & 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
|
||||
optional=optional,
|
||||
packet_type=0x01,
|
||||
packet_type=ESP3PacketType(0x01),
|
||||
)
|
||||
self._attr_is_on = False
|
||||
|
||||
def value_changed(self, packet):
|
||||
def value_changed(self, telegram: ERP1Telegram) -> None:
|
||||
"""Update the internal state of the switch."""
|
||||
if packet.data[0] == 0xA5:
|
||||
# power meter telegram, turn on if > 10 watts
|
||||
packet.parse_eep(0x12, 0x01)
|
||||
if packet.parsed["DT"]["raw_value"] == 1:
|
||||
raw_val = packet.parsed["MR"]["raw_value"]
|
||||
divisor = packet.parsed["DIV"]["raw_value"]
|
||||
if telegram.rorg == 0xA5:
|
||||
# power meter telegram, turn on if > 1 watts
|
||||
if (eep := EEP_SPECIFICATIONS.get(EEP(0xA5, 0x12, 0x01))) is None:
|
||||
LOGGER.warning("EEP A5-12-01 cannot be decoded")
|
||||
return
|
||||
|
||||
msg: EEPMessage = EEPHandler(eep).decode(telegram)
|
||||
|
||||
if "DT" in msg.values and msg.values["DT"].raw == 1:
|
||||
# this packet reports the current value
|
||||
raw_val = msg.values["MR"].raw
|
||||
divisor = msg.values["DIV"].raw
|
||||
watts = raw_val / (10**divisor)
|
||||
if watts > 1:
|
||||
self._attr_is_on = True
|
||||
self.schedule_update_ha_state()
|
||||
elif packet.data[0] == 0xD2:
|
||||
|
||||
elif telegram.rorg == 0xD2:
|
||||
# actuator status telegram
|
||||
packet.parse_eep(0x01, 0x01)
|
||||
if packet.parsed["CMD"]["raw_value"] == 4:
|
||||
channel = packet.parsed["IO"]["raw_value"]
|
||||
output = packet.parsed["OV"]["raw_value"]
|
||||
if (eep := EEP_SPECIFICATIONS.get(EEP(0xD2, 0x01, 0x01))) is None:
|
||||
LOGGER.warning("EEP D2-01-01 cannot be decoded")
|
||||
return
|
||||
|
||||
msg = EEPHandler(eep).decode(telegram)
|
||||
if msg.values["CMD"].raw == 4:
|
||||
channel = msg.values["I/O"].raw
|
||||
output = msg.values["OV"].raw
|
||||
if channel == self.channel:
|
||||
self._attr_is_on = output > 0
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
2
requirements_all.txt
generated
2
requirements_all.txt
generated
@@ -900,7 +900,7 @@ energyid-webhooks==0.0.14
|
||||
energyzero==4.0.1
|
||||
|
||||
# homeassistant.components.enocean
|
||||
enocean==0.50
|
||||
enocean-async==0.4.1
|
||||
|
||||
# homeassistant.components.entur_public_transport
|
||||
enturclient==0.2.4
|
||||
|
||||
2
requirements_test_all.txt
generated
2
requirements_test_all.txt
generated
@@ -794,7 +794,7 @@ energyid-webhooks==0.0.14
|
||||
energyzero==4.0.1
|
||||
|
||||
# homeassistant.components.enocean
|
||||
enocean==0.50
|
||||
enocean-async==0.4.1
|
||||
|
||||
# homeassistant.components.environment_canada
|
||||
env-canada==0.13.2
|
||||
|
||||
@@ -187,7 +187,6 @@ EXCEPTIONS = {
|
||||
"crownstone-sse", # https://github.com/crownstone/crownstone-lib-python-sse/pull/2
|
||||
"crownstone-uart", # https://github.com/crownstone/crownstone-lib-python-uart/pull/12
|
||||
"eliqonline", # https://github.com/molobrakos/eliqonline/pull/17
|
||||
"enocean", # https://github.com/kipe/enocean/pull/142
|
||||
"imutils", # https://github.com/PyImageSearch/imutils/pull/292
|
||||
"iso4217", # Public domain
|
||||
"kiwiki_client", # https://github.com/c7h/kiwiki_client/pull/6
|
||||
|
||||
@@ -17,8 +17,8 @@ from homeassistant.helpers.service_info.usb import UsbServiceInfo
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
DONGLE_VALIDATE_PATH_METHOD = "homeassistant.components.enocean.dongle.validate_path"
|
||||
DONGLE_DETECT_METHOD = "homeassistant.components.enocean.dongle.detect"
|
||||
GATEWAY_CLASS = "homeassistant.components.enocean.config_flow.Gateway"
|
||||
GLOB_METHOD = "homeassistant.components.enocean.config_flow.glob.glob"
|
||||
SETUP_ENTRY_METHOD = "homeassistant.components.enocean.async_setup_entry"
|
||||
|
||||
|
||||
@@ -29,10 +29,9 @@ async def test_user_flow_cannot_create_multiple_instances(hass: HomeAssistant) -
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
with patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "single_instance_allowed"
|
||||
@@ -42,7 +41,7 @@ async def test_user_flow_with_detected_dongle(hass: HomeAssistant) -> None:
|
||||
"""Test the user flow with a detected EnOcean dongle."""
|
||||
FAKE_DONGLE_PATH = "/fake/dongle"
|
||||
|
||||
with patch(DONGLE_DETECT_METHOD, Mock(return_value=[FAKE_DONGLE_PATH])):
|
||||
with patch(GLOB_METHOD, side_effect=[[FAKE_DONGLE_PATH], [], []]):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
@@ -55,8 +54,8 @@ async def test_user_flow_with_detected_dongle(hass: HomeAssistant) -> None:
|
||||
|
||||
|
||||
async def test_user_flow_with_no_detected_dongle(hass: HomeAssistant) -> None:
|
||||
"""Test the user flow with a detected EnOcean dongle."""
|
||||
with patch(DONGLE_DETECT_METHOD, Mock(return_value=[])):
|
||||
"""Test the user flow with no detected EnOcean dongle."""
|
||||
with patch(GLOB_METHOD, return_value=[]):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
@@ -69,7 +68,10 @@ async def test_detection_flow_with_valid_path(hass: HomeAssistant) -> None:
|
||||
"""Test the detection flow with a valid path selected."""
|
||||
USER_PROVIDED_PATH = "/user/provided/path"
|
||||
|
||||
with patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)):
|
||||
with patch(
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(start=AsyncMock(), stop=Mock()),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "detect"}, data={CONF_DEVICE: USER_PROVIDED_PATH}
|
||||
)
|
||||
@@ -81,17 +83,12 @@ async def test_detection_flow_with_valid_path(hass: HomeAssistant) -> None:
|
||||
async def test_detection_flow_with_custom_path(hass: HomeAssistant) -> None:
|
||||
"""Test the detection flow with custom path selected."""
|
||||
USER_PROVIDED_PATH = EnOceanFlowHandler.MANUAL_PATH_VALUE
|
||||
FAKE_DONGLE_PATH = "/fake/dongle"
|
||||
|
||||
with (
|
||||
patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)),
|
||||
patch(DONGLE_DETECT_METHOD, Mock(return_value=[FAKE_DONGLE_PATH])),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": "detect"},
|
||||
data={CONF_DEVICE: USER_PROVIDED_PATH},
|
||||
)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": "detect"},
|
||||
data={CONF_DEVICE: USER_PROVIDED_PATH},
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
@@ -100,11 +97,12 @@ async def test_detection_flow_with_custom_path(hass: HomeAssistant) -> None:
|
||||
async def test_detection_flow_with_invalid_path(hass: HomeAssistant) -> None:
|
||||
"""Test the detection flow with an invalid path selected."""
|
||||
USER_PROVIDED_PATH = "/invalid/path"
|
||||
FAKE_DONGLE_PATH = "/fake/dongle"
|
||||
|
||||
with (
|
||||
patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=False)),
|
||||
patch(DONGLE_DETECT_METHOD, Mock(return_value=[FAKE_DONGLE_PATH])),
|
||||
with patch(
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(
|
||||
start=AsyncMock(side_effect=ConnectionError("invalid path")), stop=Mock()
|
||||
),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
@@ -121,7 +119,10 @@ async def test_manual_flow_with_valid_path(hass: HomeAssistant) -> None:
|
||||
"""Test the manual flow with a valid path."""
|
||||
USER_PROVIDED_PATH = "/user/provided/path"
|
||||
|
||||
with patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)):
|
||||
with patch(
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(start=AsyncMock(), stop=Mock()),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "manual"}, data={CONF_DEVICE: USER_PROVIDED_PATH}
|
||||
)
|
||||
@@ -135,8 +136,10 @@ async def test_manual_flow_with_invalid_path(hass: HomeAssistant) -> None:
|
||||
USER_PROVIDED_PATH = "/user/provided/path"
|
||||
|
||||
with patch(
|
||||
DONGLE_VALIDATE_PATH_METHOD,
|
||||
Mock(return_value=False),
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(
|
||||
start=AsyncMock(side_effect=ConnectionError("invalid path")), stop=Mock()
|
||||
),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "manual"}, data={CONF_DEVICE: USER_PROVIDED_PATH}
|
||||
@@ -151,7 +154,10 @@ async def test_import_flow_with_valid_path(hass: HomeAssistant) -> None:
|
||||
"""Test the import flow with a valid path."""
|
||||
DATA_TO_IMPORT = {CONF_DEVICE: "/valid/path/to/import"}
|
||||
|
||||
with patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)):
|
||||
with patch(
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(start=AsyncMock(), stop=Mock()),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
@@ -167,8 +173,10 @@ async def test_import_flow_with_invalid_path(hass: HomeAssistant) -> None:
|
||||
DATA_TO_IMPORT = {CONF_DEVICE: "/invalid/path/to/import"}
|
||||
|
||||
with patch(
|
||||
DONGLE_VALIDATE_PATH_METHOD,
|
||||
Mock(return_value=False),
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(
|
||||
start=AsyncMock(side_effect=ConnectionError("invalid path")), stop=Mock()
|
||||
),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
@@ -206,7 +214,10 @@ async def test_usb_discovery(
|
||||
|
||||
# test device path
|
||||
with (
|
||||
patch(DONGLE_VALIDATE_PATH_METHOD, Mock(return_value=True)),
|
||||
patch(
|
||||
GATEWAY_CLASS,
|
||||
return_value=Mock(start=AsyncMock(), stop=Mock()),
|
||||
),
|
||||
patch(SETUP_ENTRY_METHOD, AsyncMock(return_value=True)),
|
||||
patch(
|
||||
"homeassistant.components.usb.get_serial_by_id",
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from serial import SerialException
|
||||
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
@@ -17,8 +15,8 @@ async def test_device_not_connected(
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.enocean.dongle.SerialCommunicator",
|
||||
side_effect=SerialException("Device not found"),
|
||||
"homeassistant.components.enocean.Gateway.start",
|
||||
side_effect=ConnectionError("Device not found"),
|
||||
):
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
"""Tests for the EnOcean switch platform."""
|
||||
|
||||
from enocean.utils import combine_hex
|
||||
|
||||
from homeassistant.components.enocean import DOMAIN
|
||||
from homeassistant.components.enocean.entity import combine_hex
|
||||
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
|
||||
Reference in New Issue
Block a user