1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 12:59:34 +00:00

Improve Tuya data validation (#157968)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
epenet
2025-12-04 22:20:05 +01:00
committed by GitHub
parent 2beb551db3
commit 4e8a31a4e2
5 changed files with 132 additions and 72 deletions

View File

@@ -14,7 +14,7 @@ from homeassistant.util import dt as dt_util
from . import TuyaConfigEntry
from .const import DOMAIN, DPCode
from .models import DEVICE_WARNINGS
from .type_information import DEVICE_WARNINGS
_REDACTED_DPCODES = {
DPCode.ALARM_MESSAGE,

View File

@@ -80,7 +80,7 @@ class _FanSpeedEnumWrapper(DPCodeEnumWrapper):
"""Get the number of speeds supported by the fan."""
return len(self.type_information.range)
def read_device_status(self, device: CustomerDevice) -> int | None: # type: ignore[override]
def read_device_status(self, device: CustomerDevice) -> int | None:
"""Get the current speed as a percentage."""
if (value := super().read_device_status(device)) is None:
return None

View File

@@ -9,7 +9,7 @@ from tuya_sharing import CustomerDevice
from homeassistant.util.json import json_loads
from .const import LOGGER, DPType
from .const import DPType
from .type_information import (
EnumTypeInformation,
IntegerTypeInformation,
@@ -17,24 +17,6 @@ from .type_information import (
find_dpcode,
)
# Dictionary to track logged warnings to avoid spamming logs
# Keyed by device ID
DEVICE_WARNINGS: dict[str, set[str]] = {}
def _should_log_warning(device_id: str, warning_key: str) -> bool:
"""Check if a warning has already been logged for a device and add it if not.
Returns: False if the warning was already logged, True if it was added.
"""
if (device_warnings := DEVICE_WARNINGS.get(device_id)) is None:
device_warnings = set()
DEVICE_WARNINGS[device_id] = device_warnings
if warning_key in device_warnings:
return False
DEVICE_WARNINGS[device_id].add(warning_key)
return True
class DeviceWrapper:
"""Base device wrapper."""
@@ -105,6 +87,12 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
super().__init__(dpcode)
self.type_information = type_information
def read_device_status(self, device: CustomerDevice) -> Any | None:
"""Read the device value for the dpcode."""
return self.type_information.process_raw_value(
self._read_device_status_raw(device), device
)
@classmethod
def find_dpcode(
cls,
@@ -145,12 +133,6 @@ class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
DPTYPE = DPType.BOOLEAN
def read_device_status(self, device: CustomerDevice) -> bool | None:
"""Read the device value for the dpcode."""
if (raw_value := self._read_device_status_raw(device)) in (True, False):
return raw_value
return None
def _convert_value_to_raw_value(
self, device: CustomerDevice, value: Any
) -> Any | None:
@@ -179,29 +161,6 @@ class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeInformation]):
DPTYPE = DPType.ENUM
def read_device_status(self, device: CustomerDevice) -> str | None:
"""Read the device value for the dpcode.
Values outside of the list defined by the Enum type information will
return None.
"""
if (raw_value := self._read_device_status_raw(device)) is None:
return None
if raw_value not in self.type_information.range:
if _should_log_warning(
device.id, f"enum_out_range|{self.dpcode}|{raw_value}"
):
LOGGER.warning(
"Found invalid enum value `%s` for datapoint `%s` in product id `%s`,"
" expected one of `%s`; please report this defect to Tuya support",
raw_value,
self.dpcode,
device.product_id,
self.type_information.range,
)
return None
return raw_value
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
"""Convert a Home Assistant value back to a raw device value."""
if value in self.type_information.range:
@@ -223,15 +182,6 @@ class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeInformation])
super().__init__(dpcode, type_information)
self.native_unit = type_information.unit
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode.
Value will be scaled based on the Integer type information.
"""
if (raw_value := self._read_device_status_raw(device)) is None:
return None
return raw_value / (10**self.type_information.scale)
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
"""Convert a Home Assistant value back to a raw device value."""
new_value = round(value * (10**self.type_information.scale))
@@ -250,10 +200,6 @@ class DPCodeStringWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
DPTYPE = DPType.STRING
def read_device_status(self, device: CustomerDevice) -> str | None:
"""Read the device value for the dpcode."""
return self._read_device_status_raw(device)
class DPCodeBitmapBitWrapper(DPCodeWrapper):
"""Simple wrapper for a specific bit in bitmap values."""

View File

@@ -9,12 +9,30 @@ from tuya_sharing import CustomerDevice
from homeassistant.util.json import json_loads_object
from .const import DPType
from .const import LOGGER, DPType
from .util import parse_dptype, remap_value
# Dictionary to track logged warnings to avoid spamming logs
# Keyed by device ID
DEVICE_WARNINGS: dict[str, set[str]] = {}
def _should_log_warning(device_id: str, warning_key: str) -> bool:
"""Check if a warning has already been logged for a device and add it if not.
Returns: True if the warning should be logged, False if it was already logged.
"""
if (device_warnings := DEVICE_WARNINGS.get(device_id)) is None:
device_warnings = set()
DEVICE_WARNINGS[device_id] = device_warnings
if warning_key in device_warnings:
return False
DEVICE_WARNINGS[device_id].add(warning_key)
return True
@dataclass(kw_only=True)
class TypeInformation:
class TypeInformation[T]:
"""Type information.
As provided by the SDK, from `device.function` / `device.status_range`.
@@ -23,6 +41,16 @@ class TypeInformation:
dpcode: str
type_data: str | None = None
def process_raw_value(
self, raw_value: Any | None, device: CustomerDevice
) -> T | None:
"""Read and process raw value against this type information.
Base implementation does no validation, subclasses may override to provide
specific validation.
"""
return raw_value
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return a TypeInformation object."""
@@ -30,7 +58,7 @@ class TypeInformation:
@dataclass(kw_only=True)
class BitmapTypeInformation(TypeInformation):
class BitmapTypeInformation(TypeInformation[int]):
"""Bitmap type information."""
label: list[str]
@@ -48,11 +76,62 @@ class BitmapTypeInformation(TypeInformation):
@dataclass(kw_only=True)
class EnumTypeInformation(TypeInformation):
class BooleanTypeInformation(TypeInformation[bool]):
"""Boolean type information."""
def process_raw_value(
self, raw_value: Any | None, device: CustomerDevice
) -> bool | None:
"""Read and process raw value against this type information."""
if raw_value is None:
return None
# Validate input against defined range
if raw_value not in (True, False):
if _should_log_warning(
device.id, f"boolean_out_range|{self.dpcode}|{raw_value}"
):
LOGGER.warning(
"Found invalid boolean value `%s` for datapoint `%s` in product "
"id `%s`, expected one of `%s`; please report this defect to "
"Tuya support",
raw_value,
self.dpcode,
device.product_id,
(True, False),
)
return None
return raw_value
@dataclass(kw_only=True)
class EnumTypeInformation(TypeInformation[str]):
"""Enum type information."""
range: list[str]
def process_raw_value(
self, raw_value: Any | None, device: CustomerDevice
) -> str | None:
"""Read and process raw value against this type information."""
if raw_value is None:
return None
# Validate input against defined range
if raw_value not in self.range:
if _should_log_warning(
device.id, f"enum_out_range|{self.dpcode}|{raw_value}"
):
LOGGER.warning(
"Found invalid enum value `%s` for datapoint `%s` in product "
"id `%s`, expected one of `%s`; please report this defect to "
"Tuya support",
raw_value,
self.dpcode,
device.product_id,
self.range,
)
return None
return raw_value
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return an EnumTypeInformation object."""
@@ -66,7 +145,7 @@ class EnumTypeInformation(TypeInformation):
@dataclass(kw_only=True)
class IntegerTypeInformation(TypeInformation):
class IntegerTypeInformation(TypeInformation[float]):
"""Integer type information."""
min: int
@@ -118,6 +197,31 @@ class IntegerTypeInformation(TypeInformation):
"""Remap a value from its current range to this range."""
return remap_value(value, from_min, from_max, self.min, self.max, reverse)
def process_raw_value(
self, raw_value: Any | None, device: CustomerDevice
) -> float | None:
"""Read and process raw value against this type information."""
if raw_value is None:
return None
# Validate input against defined range
if not isinstance(raw_value, int) or not (self.min <= raw_value <= self.max):
if _should_log_warning(
device.id, f"integer_out_range|{self.dpcode}|{raw_value}"
):
LOGGER.warning(
"Found invalid integer value `%s` for datapoint `%s` in product "
"id `%s`, expected integer value between %s and %s; please report "
"this defect to Tuya support",
raw_value,
self.dpcode,
device.product_id,
self.min,
self.max,
)
return None
return raw_value / (10**self.scale)
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return an IntegerTypeInformation object."""
@@ -137,7 +241,7 @@ class IntegerTypeInformation(TypeInformation):
_TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = {
DPType.BITMAP: BitmapTypeInformation,
DPType.BOOLEAN: TypeInformation,
DPType.BOOLEAN: BooleanTypeInformation,
DPType.ENUM: EnumTypeInformation,
DPType.INTEGER: IntegerTypeInformation,
DPType.JSON: TypeInformation,
@@ -156,6 +260,16 @@ def find_dpcode(
) -> BitmapTypeInformation | None: ...
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.BOOLEAN],
) -> BooleanTypeInformation | None: ...
@overload
def find_dpcode(
device: CustomerDevice,
@@ -182,7 +296,7 @@ def find_dpcode(
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.BOOLEAN, DPType.JSON, DPType.RAW],
dptype: Literal[DPType.JSON, DPType.RAW],
) -> TypeInformation | None: ...

View File

@@ -583,7 +583,7 @@
]),
'supported_features': <ClimateEntityFeature: 401>,
'target_temp_step': 1.0,
'temperature': 2.3,
'temperature': None,
}),
'context': <ANY>,
'entity_id': 'climate.floor_thermostat_kitchen',
@@ -1501,7 +1501,7 @@
'max_temp': 66,
'min_temp': 12,
'target_temp_step': 1.0,
'temperature': 4,
'temperature': None,
})
# ---
# name: test_us_customary_system[climate.geti_solar_pv_water_heater]