mirror of
https://github.com/home-assistant/core.git
synced 2026-02-28 05:46:00 +00:00
Replace Tuya remap methods with helper class (#158718)
This commit is contained in:
@@ -23,11 +23,18 @@ from . import TuyaConfigEntry
|
||||
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode
|
||||
from .entity import TuyaEntity
|
||||
from .models import DPCodeBooleanWrapper, DPCodeEnumWrapper, DPCodeIntegerWrapper
|
||||
from .type_information import IntegerTypeInformation
|
||||
from .util import RemapHelper
|
||||
|
||||
|
||||
class _DPCodePercentageMappingWrapper(DPCodeIntegerWrapper):
|
||||
"""Wrapper for DPCode position values mapping to 0-100 range."""
|
||||
|
||||
def __init__(self, dpcode: str, type_information: IntegerTypeInformation) -> None:
|
||||
"""Init DPCodeIntegerWrapper."""
|
||||
super().__init__(dpcode, type_information)
|
||||
self._remap_helper = RemapHelper.from_type_information(type_information, 0, 100)
|
||||
|
||||
def _position_reversed(self, device: CustomerDevice) -> bool:
|
||||
"""Check if the position and direction should be reversed."""
|
||||
return False
|
||||
@@ -37,21 +44,15 @@ class _DPCodePercentageMappingWrapper(DPCodeIntegerWrapper):
|
||||
return None
|
||||
|
||||
return round(
|
||||
self.type_information.remap_value_to(
|
||||
value,
|
||||
0,
|
||||
100,
|
||||
self._position_reversed(device),
|
||||
self._remap_helper.remap_value_to(
|
||||
value, reverse=self._position_reversed(device)
|
||||
)
|
||||
)
|
||||
|
||||
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
||||
return round(
|
||||
self.type_information.remap_value_from(
|
||||
value,
|
||||
0,
|
||||
100,
|
||||
self._position_reversed(device),
|
||||
self._remap_helper.remap_value_from(
|
||||
value, reverse=self._position_reversed(device)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -24,7 +24,8 @@ from . import TuyaConfigEntry
|
||||
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode
|
||||
from .entity import TuyaEntity
|
||||
from .models import DPCodeBooleanWrapper, DPCodeEnumWrapper, DPCodeIntegerWrapper
|
||||
from .util import get_dpcode
|
||||
from .type_information import IntegerTypeInformation
|
||||
from .util import RemapHelper, get_dpcode
|
||||
|
||||
_DIRECTION_DPCODES = (DPCode.FAN_DIRECTION,)
|
||||
_MODE_DPCODES = (DPCode.FAN_MODE, DPCode.MODE)
|
||||
@@ -94,6 +95,11 @@ class _FanSpeedEnumWrapper(DPCodeEnumWrapper):
|
||||
class _FanSpeedIntegerWrapper(DPCodeIntegerWrapper):
|
||||
"""Wrapper for fan speed DP code (from an integer)."""
|
||||
|
||||
def __init__(self, dpcode: str, type_information: IntegerTypeInformation) -> None:
|
||||
"""Init DPCodeIntegerWrapper."""
|
||||
super().__init__(dpcode, type_information)
|
||||
self._remap_helper = RemapHelper.from_type_information(type_information, 1, 100)
|
||||
|
||||
def get_speed_count(self) -> int:
|
||||
"""Get the number of speeds supported by the fan."""
|
||||
return 100
|
||||
@@ -102,11 +108,11 @@ class _FanSpeedIntegerWrapper(DPCodeIntegerWrapper):
|
||||
"""Get the current speed as a percentage."""
|
||||
if (value := super().read_device_status(device)) is None:
|
||||
return None
|
||||
return round(self.type_information.remap_value_to(value, 1, 100))
|
||||
return round(self._remap_helper.remap_value_to(value))
|
||||
|
||||
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
||||
"""Convert a Home Assistant value back to a raw device value."""
|
||||
return round(self.type_information.remap_value_from(value, 1, 100))
|
||||
return round(self._remap_helper.remap_value_from(value))
|
||||
|
||||
|
||||
def _get_speed_wrapper(
|
||||
|
||||
@@ -37,7 +37,7 @@ from .models import (
|
||||
DPCodeJsonWrapper,
|
||||
)
|
||||
from .type_information import IntegerTypeInformation
|
||||
from .util import remap_value
|
||||
from .util import RemapHelper
|
||||
|
||||
|
||||
class _BrightnessWrapper(DPCodeIntegerWrapper):
|
||||
@@ -50,6 +50,13 @@ class _BrightnessWrapper(DPCodeIntegerWrapper):
|
||||
|
||||
brightness_min: DPCodeIntegerWrapper | None = None
|
||||
brightness_max: DPCodeIntegerWrapper | None = None
|
||||
brightness_min_remap: RemapHelper | None = None
|
||||
brightness_max_remap: RemapHelper | None = None
|
||||
|
||||
def __init__(self, dpcode: str, type_information: IntegerTypeInformation) -> None:
|
||||
"""Init DPCodeIntegerWrapper."""
|
||||
super().__init__(dpcode, type_information)
|
||||
self._remap_helper = RemapHelper.from_type_information(type_information, 0, 255)
|
||||
|
||||
def read_device_status(self, device: CustomerDevice) -> Any | None:
|
||||
"""Return the brightness of this light between 0..255."""
|
||||
@@ -57,29 +64,31 @@ class _BrightnessWrapper(DPCodeIntegerWrapper):
|
||||
return None
|
||||
|
||||
# Remap value to our scale
|
||||
brightness = self.type_information.remap_value_to(brightness)
|
||||
brightness = self._remap_helper.remap_value_to(brightness)
|
||||
|
||||
# If there is a min/max value, the brightness is actually limited.
|
||||
# Meaning it is actually not on a 0-255 scale.
|
||||
if (
|
||||
self.brightness_max is not None
|
||||
and self.brightness_min is not None
|
||||
and self.brightness_max_remap is not None
|
||||
and self.brightness_min_remap is not None
|
||||
and (brightness_max := device.status.get(self.brightness_max.dpcode))
|
||||
is not None
|
||||
and (brightness_min := device.status.get(self.brightness_min.dpcode))
|
||||
is not None
|
||||
):
|
||||
# Remap values onto our scale
|
||||
brightness_max = self.brightness_max.type_information.remap_value_to(
|
||||
brightness_max
|
||||
)
|
||||
brightness_min = self.brightness_min.type_information.remap_value_to(
|
||||
brightness_min
|
||||
)
|
||||
brightness_max = self.brightness_max_remap.remap_value_to(brightness_max)
|
||||
brightness_min = self.brightness_min_remap.remap_value_to(brightness_min)
|
||||
|
||||
# Remap the brightness value from their min-max to our 0-255 scale
|
||||
brightness = remap_value(
|
||||
brightness, from_min=brightness_min, from_max=brightness_max
|
||||
brightness = RemapHelper.remap_value(
|
||||
brightness,
|
||||
from_min=brightness_min,
|
||||
from_max=brightness_max,
|
||||
to_min=0,
|
||||
to_max=255,
|
||||
)
|
||||
|
||||
return round(brightness)
|
||||
@@ -91,72 +100,69 @@ class _BrightnessWrapper(DPCodeIntegerWrapper):
|
||||
if (
|
||||
self.brightness_max is not None
|
||||
and self.brightness_min is not None
|
||||
and self.brightness_max_remap is not None
|
||||
and self.brightness_min_remap is not None
|
||||
and (brightness_max := device.status.get(self.brightness_max.dpcode))
|
||||
is not None
|
||||
and (brightness_min := device.status.get(self.brightness_min.dpcode))
|
||||
is not None
|
||||
):
|
||||
# Remap values onto our scale
|
||||
brightness_max = self.brightness_max.type_information.remap_value_to(
|
||||
brightness_max
|
||||
)
|
||||
brightness_min = self.brightness_min.type_information.remap_value_to(
|
||||
brightness_min
|
||||
)
|
||||
brightness_max = self.brightness_max_remap.remap_value_to(brightness_max)
|
||||
brightness_min = self.brightness_min_remap.remap_value_to(brightness_min)
|
||||
|
||||
# Remap the brightness value from our 0-255 scale to their min-max
|
||||
value = remap_value(value, to_min=brightness_min, to_max=brightness_max)
|
||||
return round(self.type_information.remap_value_from(value))
|
||||
value = RemapHelper.remap_value(
|
||||
value,
|
||||
from_min=0,
|
||||
from_max=255,
|
||||
to_min=brightness_min,
|
||||
to_max=brightness_max,
|
||||
)
|
||||
return round(self._remap_helper.remap_value_from(value))
|
||||
|
||||
|
||||
class _ColorTempWrapper(DPCodeIntegerWrapper):
|
||||
"""Wrapper for color temperature DP code."""
|
||||
|
||||
def __init__(self, dpcode: str, type_information: IntegerTypeInformation) -> None:
|
||||
"""Init DPCodeIntegerWrapper."""
|
||||
super().__init__(dpcode, type_information)
|
||||
self._remap_helper = RemapHelper.from_type_information(
|
||||
type_information, MIN_MIREDS, MAX_MIREDS
|
||||
)
|
||||
|
||||
def read_device_status(self, device: CustomerDevice) -> Any | None:
|
||||
"""Return the color temperature value in Kelvin."""
|
||||
if (temperature := device.status.get(self.dpcode)) is None:
|
||||
return None
|
||||
|
||||
return color_util.color_temperature_mired_to_kelvin(
|
||||
self.type_information.remap_value_to(
|
||||
temperature,
|
||||
MIN_MIREDS,
|
||||
MAX_MIREDS,
|
||||
reverse=True,
|
||||
)
|
||||
self._remap_helper.remap_value_to(temperature, reverse=True)
|
||||
)
|
||||
|
||||
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
||||
"""Convert a Home Assistant value (Kelvin) back to a raw device value."""
|
||||
return round(
|
||||
self.type_information.remap_value_from(
|
||||
color_util.color_temperature_kelvin_to_mired(value),
|
||||
MIN_MIREDS,
|
||||
MAX_MIREDS,
|
||||
reverse=True,
|
||||
self._remap_helper.remap_value_from(
|
||||
color_util.color_temperature_kelvin_to_mired(value), reverse=True
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
DEFAULT_H_TYPE = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=360, step=1
|
||||
)
|
||||
DEFAULT_S_TYPE = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=255, step=1
|
||||
)
|
||||
DEFAULT_V_TYPE = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=255, step=1
|
||||
)
|
||||
DEFAULT_H_TYPE = RemapHelper(source_min=1, source_max=360, target_min=0, target_max=360)
|
||||
DEFAULT_S_TYPE = RemapHelper(source_min=1, source_max=255, target_min=0, target_max=100)
|
||||
DEFAULT_V_TYPE = RemapHelper(source_min=1, source_max=255, target_min=0, target_max=255)
|
||||
|
||||
|
||||
DEFAULT_H_TYPE_V2 = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=360, step=1
|
||||
DEFAULT_H_TYPE_V2 = RemapHelper(
|
||||
source_min=1, source_max=360, target_min=0, target_max=360
|
||||
)
|
||||
DEFAULT_S_TYPE_V2 = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=1000, step=1
|
||||
DEFAULT_S_TYPE_V2 = RemapHelper(
|
||||
source_min=1, source_max=1000, target_min=0, target_max=100
|
||||
)
|
||||
DEFAULT_V_TYPE_V2 = IntegerTypeInformation(
|
||||
dpcode=DPCode.COLOUR_DATA_HSV, min=1, scale=0, max=1000, step=1
|
||||
DEFAULT_V_TYPE_V2 = RemapHelper(
|
||||
source_min=1, source_max=1000, target_min=0, target_max=255
|
||||
)
|
||||
|
||||
|
||||
@@ -172,23 +178,23 @@ class _ColorDataWrapper(DPCodeJsonWrapper):
|
||||
if (status := self.read_device_status(device)) is None:
|
||||
return None
|
||||
return (
|
||||
self.h_type.remap_value_to(cast(int, status["h"]), 0, 360),
|
||||
self.s_type.remap_value_to(cast(int, status["s"]), 0, 100),
|
||||
self.h_type.remap_value_to(status["h"]),
|
||||
self.s_type.remap_value_to(status["s"]),
|
||||
)
|
||||
|
||||
def read_brightness(self, device: CustomerDevice) -> int | None:
|
||||
"""Get the brightness value from this color data."""
|
||||
if (status := self.read_device_status(device)) is None:
|
||||
return None
|
||||
return round(self.v_type.remap_value_to(cast(int, status["v"]), 0, 255))
|
||||
return round(self.v_type.remap_value_to(status["v"]))
|
||||
|
||||
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
||||
"""Convert a Home Assistant color/brightness pair back to a raw device value."""
|
||||
color, brightness = value
|
||||
return json.dumps(
|
||||
{
|
||||
"h": round(self.h_type.remap_value_from(color[0], 0, 360)),
|
||||
"s": round(self.s_type.remap_value_from(color[1], 0, 100)),
|
||||
"h": round(self.h_type.remap_value_from(color[0])),
|
||||
"s": round(self.s_type.remap_value_from(color[1])),
|
||||
"v": round(self.v_type.remap_value_from(brightness)),
|
||||
}
|
||||
)
|
||||
@@ -545,12 +551,20 @@ def _get_brightness_wrapper(
|
||||
)
|
||||
) is None:
|
||||
return None
|
||||
brightness_wrapper.brightness_max = DPCodeIntegerWrapper.find_dpcode(
|
||||
if brightness_max := DPCodeIntegerWrapper.find_dpcode(
|
||||
device, description.brightness_max, prefer_function=True
|
||||
)
|
||||
brightness_wrapper.brightness_min = DPCodeIntegerWrapper.find_dpcode(
|
||||
):
|
||||
brightness_wrapper.brightness_max = brightness_max
|
||||
brightness_wrapper.brightness_max_remap = RemapHelper.from_type_information(
|
||||
brightness_max.type_information, 0, 255
|
||||
)
|
||||
if brightness_min := DPCodeIntegerWrapper.find_dpcode(
|
||||
device, description.brightness_min, prefer_function=True
|
||||
)
|
||||
):
|
||||
brightness_wrapper.brightness_min = brightness_min
|
||||
brightness_wrapper.brightness_min_remap = RemapHelper.from_type_information(
|
||||
brightness_min.type_information, 0, 255
|
||||
)
|
||||
return brightness_wrapper
|
||||
|
||||
|
||||
@@ -568,19 +582,16 @@ def _get_color_data_wrapper(
|
||||
|
||||
# Fetch color data type information
|
||||
if function_data := json_loads_object(
|
||||
cast(str, color_data_wrapper.type_information.type_data)
|
||||
color_data_wrapper.type_information.type_data
|
||||
):
|
||||
color_data_wrapper.h_type = IntegerTypeInformation(
|
||||
dpcode=color_data_wrapper.dpcode,
|
||||
**cast(dict, function_data["h"]),
|
||||
color_data_wrapper.h_type = RemapHelper.from_function_data(
|
||||
cast(dict, function_data["h"]), 0, 360
|
||||
)
|
||||
color_data_wrapper.s_type = IntegerTypeInformation(
|
||||
dpcode=color_data_wrapper.dpcode,
|
||||
**cast(dict, function_data["s"]),
|
||||
color_data_wrapper.s_type = RemapHelper.from_function_data(
|
||||
cast(dict, function_data["s"]), 0, 100
|
||||
)
|
||||
color_data_wrapper.v_type = IntegerTypeInformation(
|
||||
dpcode=color_data_wrapper.dpcode,
|
||||
**cast(dict, function_data["v"]),
|
||||
color_data_wrapper.v_type = RemapHelper.from_function_data(
|
||||
cast(dict, function_data["v"]), 0, 255
|
||||
)
|
||||
elif (
|
||||
description.fallback_color_data_mode == FallbackColorDataMode.V2
|
||||
|
||||
@@ -11,7 +11,7 @@ from tuya_sharing import CustomerDevice
|
||||
from homeassistant.util.json import json_loads_object
|
||||
|
||||
from .const import LOGGER, DPType
|
||||
from .util import parse_dptype, remap_value
|
||||
from .util import parse_dptype
|
||||
|
||||
# Dictionary to track logged warnings to avoid spamming logs
|
||||
# Keyed by device ID
|
||||
@@ -41,7 +41,7 @@ class TypeInformation[T]:
|
||||
|
||||
_DPTYPE: ClassVar[DPType]
|
||||
dpcode: str
|
||||
type_data: str | None = None
|
||||
type_data: str
|
||||
|
||||
def process_raw_value(
|
||||
self, raw_value: Any | None, device: CustomerDevice
|
||||
@@ -223,26 +223,6 @@ class IntegerTypeInformation(TypeInformation[float]):
|
||||
"""Return raw value for scaled."""
|
||||
return round(value * (10**self.scale))
|
||||
|
||||
def remap_value_to(
|
||||
self,
|
||||
value: float,
|
||||
to_min: float = 0,
|
||||
to_max: float = 255,
|
||||
reverse: bool = False,
|
||||
) -> float:
|
||||
"""Remap a value from this range to a new range."""
|
||||
return remap_value(value, self.min, self.max, to_min, to_max, reverse)
|
||||
|
||||
def remap_value_from(
|
||||
self,
|
||||
value: float,
|
||||
from_min: float = 0,
|
||||
from_max: float = 255,
|
||||
reverse: bool = False,
|
||||
) -> float:
|
||||
"""Remap a value from its current range to this range."""
|
||||
return remap_value(value, from_min, from_max, self.min, self.max, reverse)
|
||||
|
||||
def process_raw_value(
|
||||
self, raw_value: Any | None, device: CustomerDevice
|
||||
) -> float | None:
|
||||
|
||||
@@ -2,12 +2,18 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from tuya_sharing import CustomerDevice
|
||||
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
|
||||
from .const import DOMAIN, DPCode, DPType
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .type_information import IntegerTypeInformation
|
||||
|
||||
_DPTYPE_MAPPING: dict[str, DPType] = {
|
||||
"bitmap": DPType.BITMAP,
|
||||
"bool": DPType.BOOLEAN,
|
||||
@@ -50,18 +56,78 @@ def parse_dptype(dptype: str) -> DPType | None:
|
||||
return _DPTYPE_MAPPING.get(dptype)
|
||||
|
||||
|
||||
def remap_value(
|
||||
value: float,
|
||||
from_min: float = 0,
|
||||
from_max: float = 255,
|
||||
to_min: float = 0,
|
||||
to_max: float = 255,
|
||||
reverse: bool = False,
|
||||
) -> float:
|
||||
"""Remap a value from its current range, to a new range."""
|
||||
if reverse:
|
||||
value = from_max - value + from_min
|
||||
return ((value - from_min) / (from_max - from_min)) * (to_max - to_min) + to_min
|
||||
@dataclass(kw_only=True)
|
||||
class RemapHelper:
|
||||
"""Helper class for remapping values."""
|
||||
|
||||
source_min: int
|
||||
source_max: int
|
||||
target_min: int
|
||||
target_max: int
|
||||
|
||||
@classmethod
|
||||
def from_type_information(
|
||||
cls,
|
||||
type_information: IntegerTypeInformation,
|
||||
target_min: int,
|
||||
target_max: int,
|
||||
) -> RemapHelper:
|
||||
"""Create RemapHelper from IntegerTypeInformation."""
|
||||
return cls(
|
||||
source_min=type_information.min,
|
||||
source_max=type_information.max,
|
||||
target_min=target_min,
|
||||
target_max=target_max,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_function_data(
|
||||
cls, function_data: dict[str, Any], target_min: int, target_max: int
|
||||
) -> RemapHelper:
|
||||
"""Create RemapHelper from function_data."""
|
||||
return cls(
|
||||
source_min=function_data["min"],
|
||||
source_max=function_data["max"],
|
||||
target_min=target_min,
|
||||
target_max=target_max,
|
||||
)
|
||||
|
||||
def remap_value_to(self, value: float, *, reverse: bool = False) -> float:
|
||||
"""Remap a value from this range to a new range."""
|
||||
return self.remap_value(
|
||||
value,
|
||||
self.source_min,
|
||||
self.source_max,
|
||||
self.target_min,
|
||||
self.target_max,
|
||||
reverse=reverse,
|
||||
)
|
||||
|
||||
def remap_value_from(self, value: float, *, reverse: bool = False) -> float:
|
||||
"""Remap a value from its current range to this range."""
|
||||
return self.remap_value(
|
||||
value,
|
||||
self.target_min,
|
||||
self.target_max,
|
||||
self.source_min,
|
||||
self.source_max,
|
||||
reverse=reverse,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def remap_value(
|
||||
value: float,
|
||||
from_min: float,
|
||||
from_max: float,
|
||||
to_min: float,
|
||||
to_max: float,
|
||||
*,
|
||||
reverse: bool = False,
|
||||
) -> float:
|
||||
"""Remap a value from its current range, to a new range."""
|
||||
if reverse:
|
||||
value = from_max - value + from_min
|
||||
return ((value - from_min) / (from_max - from_min)) * (to_max - to_min) + to_min
|
||||
|
||||
|
||||
class ActionDPCodeNotFoundError(ServiceValidationError):
|
||||
|
||||
Reference in New Issue
Block a user