From 4e8a31a4e2bba1ebe935684e8df1bf96b6632b6b Mon Sep 17 00:00:00 2001 From: epenet <6771947+epenet@users.noreply.github.com> Date: Thu, 4 Dec 2025 22:20:05 +0100 Subject: [PATCH] Improve Tuya data validation (#157968) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- homeassistant/components/tuya/diagnostics.py | 2 +- homeassistant/components/tuya/fan.py | 2 +- homeassistant/components/tuya/models.py | 68 +--------- .../components/tuya/type_information.py | 128 +++++++++++++++++- .../tuya/snapshots/test_climate.ambr | 4 +- 5 files changed, 132 insertions(+), 72 deletions(-) diff --git a/homeassistant/components/tuya/diagnostics.py b/homeassistant/components/tuya/diagnostics.py index a4f5ed07112..ec7fa3cb140 100644 --- a/homeassistant/components/tuya/diagnostics.py +++ b/homeassistant/components/tuya/diagnostics.py @@ -14,7 +14,7 @@ from homeassistant.util import dt as dt_util from . import TuyaConfigEntry from .const import DOMAIN, DPCode -from .models import DEVICE_WARNINGS +from .type_information import DEVICE_WARNINGS _REDACTED_DPCODES = { DPCode.ALARM_MESSAGE, diff --git a/homeassistant/components/tuya/fan.py b/homeassistant/components/tuya/fan.py index a0180fa76d6..585e69e97ca 100644 --- a/homeassistant/components/tuya/fan.py +++ b/homeassistant/components/tuya/fan.py @@ -80,7 +80,7 @@ class _FanSpeedEnumWrapper(DPCodeEnumWrapper): """Get the number of speeds supported by the fan.""" return len(self.type_information.range) - def read_device_status(self, device: CustomerDevice) -> int | None: # type: ignore[override] + def read_device_status(self, device: CustomerDevice) -> int | None: """Get the current speed as a percentage.""" if (value := super().read_device_status(device)) is None: return None diff --git a/homeassistant/components/tuya/models.py b/homeassistant/components/tuya/models.py index f157a52c1f4..5f09c2c9af7 100644 --- a/homeassistant/components/tuya/models.py +++ b/homeassistant/components/tuya/models.py @@ -9,7 +9,7 @@ from tuya_sharing import CustomerDevice from homeassistant.util.json import json_loads -from .const import LOGGER, DPType +from .const import DPType from .type_information import ( EnumTypeInformation, IntegerTypeInformation, @@ -17,24 +17,6 @@ from .type_information import ( find_dpcode, ) -# Dictionary to track logged warnings to avoid spamming logs -# Keyed by device ID -DEVICE_WARNINGS: dict[str, set[str]] = {} - - -def _should_log_warning(device_id: str, warning_key: str) -> bool: - """Check if a warning has already been logged for a device and add it if not. - - Returns: False if the warning was already logged, True if it was added. - """ - if (device_warnings := DEVICE_WARNINGS.get(device_id)) is None: - device_warnings = set() - DEVICE_WARNINGS[device_id] = device_warnings - if warning_key in device_warnings: - return False - DEVICE_WARNINGS[device_id].add(warning_key) - return True - class DeviceWrapper: """Base device wrapper.""" @@ -105,6 +87,12 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper): super().__init__(dpcode) self.type_information = type_information + def read_device_status(self, device: CustomerDevice) -> Any | None: + """Read the device value for the dpcode.""" + return self.type_information.process_raw_value( + self._read_device_status_raw(device), device + ) + @classmethod def find_dpcode( cls, @@ -145,12 +133,6 @@ class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]): DPTYPE = DPType.BOOLEAN - def read_device_status(self, device: CustomerDevice) -> bool | None: - """Read the device value for the dpcode.""" - if (raw_value := self._read_device_status_raw(device)) in (True, False): - return raw_value - return None - def _convert_value_to_raw_value( self, device: CustomerDevice, value: Any ) -> Any | None: @@ -179,29 +161,6 @@ class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeInformation]): DPTYPE = DPType.ENUM - def read_device_status(self, device: CustomerDevice) -> str | None: - """Read the device value for the dpcode. - - Values outside of the list defined by the Enum type information will - return None. - """ - if (raw_value := self._read_device_status_raw(device)) is None: - return None - if raw_value not in self.type_information.range: - if _should_log_warning( - device.id, f"enum_out_range|{self.dpcode}|{raw_value}" - ): - LOGGER.warning( - "Found invalid enum value `%s` for datapoint `%s` in product id `%s`," - " expected one of `%s`; please report this defect to Tuya support", - raw_value, - self.dpcode, - device.product_id, - self.type_information.range, - ) - return None - return raw_value - def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any: """Convert a Home Assistant value back to a raw device value.""" if value in self.type_information.range: @@ -223,15 +182,6 @@ class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeInformation]) super().__init__(dpcode, type_information) self.native_unit = type_information.unit - def read_device_status(self, device: CustomerDevice) -> float | None: - """Read the device value for the dpcode. - - Value will be scaled based on the Integer type information. - """ - if (raw_value := self._read_device_status_raw(device)) is None: - return None - return raw_value / (10**self.type_information.scale) - def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any: """Convert a Home Assistant value back to a raw device value.""" new_value = round(value * (10**self.type_information.scale)) @@ -250,10 +200,6 @@ class DPCodeStringWrapper(DPCodeTypeInformationWrapper[TypeInformation]): DPTYPE = DPType.STRING - def read_device_status(self, device: CustomerDevice) -> str | None: - """Read the device value for the dpcode.""" - return self._read_device_status_raw(device) - class DPCodeBitmapBitWrapper(DPCodeWrapper): """Simple wrapper for a specific bit in bitmap values.""" diff --git a/homeassistant/components/tuya/type_information.py b/homeassistant/components/tuya/type_information.py index 0e2af878c2d..5180e5ca5da 100644 --- a/homeassistant/components/tuya/type_information.py +++ b/homeassistant/components/tuya/type_information.py @@ -9,12 +9,30 @@ from tuya_sharing import CustomerDevice from homeassistant.util.json import json_loads_object -from .const import DPType +from .const import LOGGER, DPType from .util import parse_dptype, remap_value +# Dictionary to track logged warnings to avoid spamming logs +# Keyed by device ID +DEVICE_WARNINGS: dict[str, set[str]] = {} + + +def _should_log_warning(device_id: str, warning_key: str) -> bool: + """Check if a warning has already been logged for a device and add it if not. + + Returns: True if the warning should be logged, False if it was already logged. + """ + if (device_warnings := DEVICE_WARNINGS.get(device_id)) is None: + device_warnings = set() + DEVICE_WARNINGS[device_id] = device_warnings + if warning_key in device_warnings: + return False + DEVICE_WARNINGS[device_id].add(warning_key) + return True + @dataclass(kw_only=True) -class TypeInformation: +class TypeInformation[T]: """Type information. As provided by the SDK, from `device.function` / `device.status_range`. @@ -23,6 +41,16 @@ class TypeInformation: dpcode: str type_data: str | None = None + def process_raw_value( + self, raw_value: Any | None, device: CustomerDevice + ) -> T | None: + """Read and process raw value against this type information. + + Base implementation does no validation, subclasses may override to provide + specific validation. + """ + return raw_value + @classmethod def from_json(cls, dpcode: str, type_data: str) -> Self | None: """Load JSON string and return a TypeInformation object.""" @@ -30,7 +58,7 @@ class TypeInformation: @dataclass(kw_only=True) -class BitmapTypeInformation(TypeInformation): +class BitmapTypeInformation(TypeInformation[int]): """Bitmap type information.""" label: list[str] @@ -48,11 +76,62 @@ class BitmapTypeInformation(TypeInformation): @dataclass(kw_only=True) -class EnumTypeInformation(TypeInformation): +class BooleanTypeInformation(TypeInformation[bool]): + """Boolean type information.""" + + def process_raw_value( + self, raw_value: Any | None, device: CustomerDevice + ) -> bool | None: + """Read and process raw value against this type information.""" + if raw_value is None: + return None + # Validate input against defined range + if raw_value not in (True, False): + if _should_log_warning( + device.id, f"boolean_out_range|{self.dpcode}|{raw_value}" + ): + LOGGER.warning( + "Found invalid boolean value `%s` for datapoint `%s` in product " + "id `%s`, expected one of `%s`; please report this defect to " + "Tuya support", + raw_value, + self.dpcode, + device.product_id, + (True, False), + ) + return None + return raw_value + + +@dataclass(kw_only=True) +class EnumTypeInformation(TypeInformation[str]): """Enum type information.""" range: list[str] + def process_raw_value( + self, raw_value: Any | None, device: CustomerDevice + ) -> str | None: + """Read and process raw value against this type information.""" + if raw_value is None: + return None + # Validate input against defined range + if raw_value not in self.range: + if _should_log_warning( + device.id, f"enum_out_range|{self.dpcode}|{raw_value}" + ): + LOGGER.warning( + "Found invalid enum value `%s` for datapoint `%s` in product " + "id `%s`, expected one of `%s`; please report this defect to " + "Tuya support", + raw_value, + self.dpcode, + device.product_id, + self.range, + ) + return None + return raw_value + @classmethod def from_json(cls, dpcode: str, type_data: str) -> Self | None: """Load JSON string and return an EnumTypeInformation object.""" @@ -66,7 +145,7 @@ class EnumTypeInformation(TypeInformation): @dataclass(kw_only=True) -class IntegerTypeInformation(TypeInformation): +class IntegerTypeInformation(TypeInformation[float]): """Integer type information.""" min: int @@ -118,6 +197,31 @@ class IntegerTypeInformation(TypeInformation): """Remap a value from its current range to this range.""" return remap_value(value, from_min, from_max, self.min, self.max, reverse) + def process_raw_value( + self, raw_value: Any | None, device: CustomerDevice + ) -> float | None: + """Read and process raw value against this type information.""" + if raw_value is None: + return None + # Validate input against defined range + if not isinstance(raw_value, int) or not (self.min <= raw_value <= self.max): + if _should_log_warning( + device.id, f"integer_out_range|{self.dpcode}|{raw_value}" + ): + LOGGER.warning( + "Found invalid integer value `%s` for datapoint `%s` in product " + "id `%s`, expected integer value between %s and %s; please report " + "this defect to Tuya support", + raw_value, + self.dpcode, + device.product_id, + self.min, + self.max, + ) + + return None + return raw_value / (10**self.scale) + @classmethod def from_json(cls, dpcode: str, type_data: str) -> Self | None: """Load JSON string and return an IntegerTypeInformation object.""" @@ -137,7 +241,7 @@ class IntegerTypeInformation(TypeInformation): _TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = { DPType.BITMAP: BitmapTypeInformation, - DPType.BOOLEAN: TypeInformation, + DPType.BOOLEAN: BooleanTypeInformation, DPType.ENUM: EnumTypeInformation, DPType.INTEGER: IntegerTypeInformation, DPType.JSON: TypeInformation, @@ -156,6 +260,16 @@ def find_dpcode( ) -> BitmapTypeInformation | None: ... +@overload +def find_dpcode( + device: CustomerDevice, + dpcodes: str | tuple[str, ...] | None, + *, + prefer_function: bool = False, + dptype: Literal[DPType.BOOLEAN], +) -> BooleanTypeInformation | None: ... + + @overload def find_dpcode( device: CustomerDevice, @@ -182,7 +296,7 @@ def find_dpcode( dpcodes: str | tuple[str, ...] | None, *, prefer_function: bool = False, - dptype: Literal[DPType.BOOLEAN, DPType.JSON, DPType.RAW], + dptype: Literal[DPType.JSON, DPType.RAW], ) -> TypeInformation | None: ... diff --git a/tests/components/tuya/snapshots/test_climate.ambr b/tests/components/tuya/snapshots/test_climate.ambr index 542c0a5c8f5..9937b680b2e 100644 --- a/tests/components/tuya/snapshots/test_climate.ambr +++ b/tests/components/tuya/snapshots/test_climate.ambr @@ -583,7 +583,7 @@ ]), 'supported_features': , 'target_temp_step': 1.0, - 'temperature': 2.3, + 'temperature': None, }), 'context': , 'entity_id': 'climate.floor_thermostat_kitchen', @@ -1501,7 +1501,7 @@ 'max_temp': 66, 'min_temp': 12, 'target_temp_step': 1.0, - 'temperature': 4, + 'temperature': None, }) # --- # name: test_us_customary_system[climate.geti_solar_pv_water_heater]