1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 12:59:34 +00:00

Teach long term statistics that unit 'rpm' is same as 'RPM' (#80012)

* Teach long term statistics that unit 'rpm' is same as 'RPM'

* Add tests
This commit is contained in:
Erik Montnemery
2022-10-11 10:32:01 +02:00
committed by GitHub
parent edad6d0f26
commit 69d935b7bd
2 changed files with 217 additions and 4 deletions

View File

@@ -2192,6 +2192,203 @@ def test_compile_hourly_statistics_changing_units_3(
assert "Error while processing event StatisticsTask" not in caplog.text
@pytest.mark.parametrize(
"device_class, state_unit, state_unit2, unit_class, mean, mean2, min, max",
[
(None, "RPM", "rpm", None, 13.050847, 13.333333, -10, 30),
(None, "rpm", "RPM", None, 13.050847, 13.333333, -10, 30),
],
)
def test_compile_hourly_statistics_equivalent_units_1(
hass_recorder,
caplog,
device_class,
state_unit,
state_unit2,
unit_class,
mean,
mean2,
min,
max,
):
"""Test compiling hourly statistics where units change from one hour to the next."""
zero = dt_util.utcnow()
hass = hass_recorder()
setup_component(hass, "sensor", {})
wait_recording_done(hass) # Wait for the sensor recorder platform to be added
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": state_unit,
}
four, states = record_states(hass, zero, "sensor.test1", attributes)
attributes["unit_of_measurement"] = state_unit2
four, _states = record_states(
hass, zero + timedelta(minutes=5), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"]
four, _states = record_states(
hass, zero + timedelta(minutes=10), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
do_adhoc_statistics(hass, start=zero)
wait_recording_done(hass)
assert "can not be converted to the unit of previously" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{
"statistic_id": "sensor.test1",
"has_mean": True,
"has_sum": False,
"name": None,
"source": "recorder",
"statistics_unit_of_measurement": state_unit,
"unit_class": unit_class,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
"sensor.test1": [
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero),
"end": process_timestamp_to_utc_isoformat(zero + timedelta(minutes=5)),
"mean": approx(mean),
"min": approx(min),
"max": approx(max),
"last_reset": None,
"state": None,
"sum": None,
}
]
}
do_adhoc_statistics(hass, start=zero + timedelta(minutes=10))
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{
"statistic_id": "sensor.test1",
"has_mean": True,
"has_sum": False,
"name": None,
"source": "recorder",
"statistics_unit_of_measurement": state_unit2,
"unit_class": unit_class,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
"sensor.test1": [
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero),
"end": process_timestamp_to_utc_isoformat(zero + timedelta(minutes=5)),
"mean": approx(mean),
"min": approx(min),
"max": approx(max),
"last_reset": None,
"state": None,
"sum": None,
},
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(
zero + timedelta(minutes=10)
),
"end": process_timestamp_to_utc_isoformat(zero + timedelta(minutes=15)),
"mean": approx(mean2),
"min": approx(min),
"max": approx(max),
"last_reset": None,
"state": None,
"sum": None,
},
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
@pytest.mark.parametrize(
"device_class, state_unit, state_unit2, unit_class, mean, min, max",
[
(None, "RPM", "rpm", None, 13.333333, -10, 30),
(None, "rpm", "RPM", None, 13.333333, -10, 30),
],
)
def test_compile_hourly_statistics_equivalent_units_2(
hass_recorder,
caplog,
device_class,
state_unit,
state_unit2,
unit_class,
mean,
min,
max,
):
"""Test compiling hourly statistics where units change during an hour."""
zero = dt_util.utcnow()
hass = hass_recorder()
setup_component(hass, "sensor", {})
wait_recording_done(hass) # Wait for the sensor recorder platform to be added
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": state_unit,
}
four, states = record_states(hass, zero, "sensor.test1", attributes)
attributes["unit_of_measurement"] = state_unit2
four, _states = record_states(
hass, zero + timedelta(minutes=5), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
do_adhoc_statistics(hass, start=zero + timedelta(seconds=30 * 5))
wait_recording_done(hass)
assert "The unit of sensor.test1 is changing" not in caplog.text
assert "and matches the unit of already compiled statistics" not in caplog.text
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{
"statistic_id": "sensor.test1",
"has_mean": True,
"has_sum": False,
"name": None,
"source": "recorder",
"statistics_unit_of_measurement": state_unit,
"unit_class": unit_class,
},
]
stats = statistics_during_period(hass, zero, period="5minute")
assert stats == {
"sensor.test1": [
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(
zero + timedelta(seconds=30 * 5)
),
"end": process_timestamp_to_utc_isoformat(
zero + timedelta(seconds=30 * 15)
),
"mean": approx(mean),
"min": approx(min),
"max": approx(max),
"last_reset": None,
"state": None,
"sum": None,
},
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
@pytest.mark.parametrize(
"device_class, state_unit, statistic_unit, unit_class, mean1, mean2, min, max",
[