mirror of
https://github.com/home-assistant/core.git
synced 2026-02-15 07:36:16 +00:00
Normalize unique ID in WLED (#157901)
Co-authored-by: mik-laj <12058428+mik-laj@users.noreply.github.com>
This commit is contained in:
@@ -2,6 +2,11 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from homeassistant.config_entries import SOURCE_IGNORE
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
@@ -13,8 +18,11 @@ from .coordinator import (
|
||||
WLEDConfigEntry,
|
||||
WLEDDataUpdateCoordinator,
|
||||
WLEDReleasesDataUpdateCoordinator,
|
||||
normalize_mac_address,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORMS = (
|
||||
Platform.BUTTON,
|
||||
Platform.LIGHT,
|
||||
@@ -63,3 +71,69 @@ async def async_unload_entry(hass: HomeAssistant, entry: WLEDConfigEntry) -> boo
|
||||
coordinator.unsub()
|
||||
|
||||
return unload_ok
|
||||
|
||||
|
||||
async def async_migrate_entry(
|
||||
hass: HomeAssistant, config_entry: WLEDConfigEntry
|
||||
) -> bool:
|
||||
"""Migrate old entry."""
|
||||
_LOGGER.debug(
|
||||
"Migrating configuration from version %s.%s",
|
||||
config_entry.version,
|
||||
config_entry.minor_version,
|
||||
)
|
||||
|
||||
if config_entry.version > 1:
|
||||
# The user has downgraded from a future version
|
||||
return False
|
||||
|
||||
if config_entry.version == 1:
|
||||
if config_entry.minor_version < 2:
|
||||
# 1.2: Normalize unique ID to be lowercase MAC address without separators.
|
||||
# This matches the format used by WLED firmware.
|
||||
if TYPE_CHECKING:
|
||||
assert config_entry.unique_id
|
||||
normalized_mac_address = normalize_mac_address(config_entry.unique_id)
|
||||
duplicate_entries = [
|
||||
entry
|
||||
for entry in hass.config_entries.async_entries(DOMAIN)
|
||||
if entry.unique_id
|
||||
and normalize_mac_address(entry.unique_id) == normalized_mac_address
|
||||
]
|
||||
ignored_entries = [
|
||||
entry
|
||||
for entry in duplicate_entries
|
||||
if entry.entry_id != config_entry.entry_id
|
||||
and entry.source == SOURCE_IGNORE
|
||||
]
|
||||
if ignored_entries:
|
||||
_LOGGER.info(
|
||||
"Found %d ignored WLED config entries with the same MAC address, removing them",
|
||||
len(ignored_entries),
|
||||
)
|
||||
await asyncio.gather(
|
||||
*[
|
||||
hass.config_entries.async_remove(entry.entry_id)
|
||||
for entry in ignored_entries
|
||||
]
|
||||
)
|
||||
if len(duplicate_entries) - len(ignored_entries) > 1:
|
||||
_LOGGER.warning(
|
||||
"Found multiple WLED config entries with the same MAC address, cannot migrate to version 1.2"
|
||||
)
|
||||
return False
|
||||
|
||||
hass.config_entries.async_update_entry(
|
||||
config_entry,
|
||||
unique_id=normalized_mac_address,
|
||||
version=1,
|
||||
minor_version=2,
|
||||
)
|
||||
|
||||
_LOGGER.debug(
|
||||
"Migration to configuration version %s.%s successful",
|
||||
config_entry.version,
|
||||
config_entry.minor_version,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
@@ -22,7 +22,7 @@ from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
|
||||
|
||||
from .const import CONF_KEEP_MAIN_LIGHT, DEFAULT_KEEP_MAIN_LIGHT, DOMAIN
|
||||
from .coordinator import WLEDConfigEntry
|
||||
from .coordinator import WLEDConfigEntry, normalize_mac_address
|
||||
|
||||
|
||||
def _normalize_host(host: str) -> str:
|
||||
@@ -38,6 +38,7 @@ class WLEDFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a WLED config flow."""
|
||||
|
||||
VERSION = 1
|
||||
MINOR_VERSION = 2
|
||||
discovered_host: str
|
||||
discovered_device: Device
|
||||
|
||||
@@ -64,16 +65,15 @@ class WLEDFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
except WLEDConnectionError:
|
||||
errors["base"] = "cannot_connect"
|
||||
else:
|
||||
await self.async_set_unique_id(
|
||||
device.info.mac_address, raise_on_progress=False
|
||||
)
|
||||
mac_address = normalize_mac_address(device.info.mac_address)
|
||||
await self.async_set_unique_id(mac_address, raise_on_progress=False)
|
||||
if self.source == SOURCE_RECONFIGURE:
|
||||
entry = self._get_reconfigure_entry()
|
||||
self._abort_if_unique_id_mismatch(
|
||||
reason="unique_id_mismatch",
|
||||
description_placeholders={
|
||||
"expected_mac": format_mac(entry.unique_id).upper(),
|
||||
"actual_mac": format_mac(self.unique_id).upper(),
|
||||
"actual_mac": mac_address.upper(),
|
||||
},
|
||||
)
|
||||
return self.async_update_reload_and_abort(
|
||||
@@ -111,7 +111,7 @@ class WLEDFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle zeroconf discovery."""
|
||||
# Abort quick if the mac address is provided by discovery info
|
||||
if mac := discovery_info.properties.get(CONF_MAC):
|
||||
await self.async_set_unique_id(mac)
|
||||
await self.async_set_unique_id(normalize_mac_address(mac))
|
||||
self._abort_if_unique_id_configured(
|
||||
updates={CONF_HOST: discovery_info.host}
|
||||
)
|
||||
@@ -124,7 +124,10 @@ class WLEDFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
except WLEDConnectionError:
|
||||
return self.async_abort(reason="cannot_connect")
|
||||
|
||||
await self.async_set_unique_id(self.discovered_device.info.mac_address)
|
||||
device_mac_address = normalize_mac_address(
|
||||
self.discovered_device.info.mac_address
|
||||
)
|
||||
await self.async_set_unique_id(device_mac_address)
|
||||
self._abort_if_unique_id_configured(updates={CONF_HOST: discovery_info.host})
|
||||
|
||||
self.context.update(
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from wled import (
|
||||
WLED,
|
||||
Device as WLEDDevice,
|
||||
@@ -32,6 +34,17 @@ from .const import (
|
||||
type WLEDConfigEntry = ConfigEntry[WLEDDataUpdateCoordinator]
|
||||
|
||||
|
||||
def normalize_mac_address(mac: str) -> str:
|
||||
"""Normalize a MAC address to lowercase without separators.
|
||||
|
||||
This format is used by WLED firmware as well as unique IDs in Home Assistant.
|
||||
|
||||
The homeassistant.helpers.device_registry.format_mac function is preferred but
|
||||
returns MAC addresses with colons as separators.
|
||||
"""
|
||||
return mac.lower().replace(":", "").replace(".", "").replace("-", "").strip()
|
||||
|
||||
|
||||
class WLEDDataUpdateCoordinator(DataUpdateCoordinator[WLEDDevice]):
|
||||
"""Class to manage fetching WLED data from single endpoint."""
|
||||
|
||||
@@ -51,6 +64,10 @@ class WLEDDataUpdateCoordinator(DataUpdateCoordinator[WLEDDevice]):
|
||||
self.wled = WLED(entry.data[CONF_HOST], session=async_get_clientsession(hass))
|
||||
self.unsub: CALLBACK_TYPE | None = None
|
||||
|
||||
if TYPE_CHECKING:
|
||||
assert entry.unique_id
|
||||
self.config_mac_address = normalize_mac_address(entry.unique_id)
|
||||
|
||||
super().__init__(
|
||||
hass,
|
||||
LOGGER,
|
||||
@@ -131,13 +148,14 @@ class WLEDDataUpdateCoordinator(DataUpdateCoordinator[WLEDDevice]):
|
||||
translation_placeholders={"error": str(error)},
|
||||
) from error
|
||||
|
||||
if device.info.mac_address != self.config_entry.unique_id:
|
||||
device_mac_address = normalize_mac_address(device.info.mac_address)
|
||||
if device_mac_address != self.config_mac_address:
|
||||
raise ConfigEntryError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="mac_address_mismatch",
|
||||
translation_placeholders={
|
||||
"expected_mac": format_mac(self.config_entry.unique_id).upper(),
|
||||
"actual_mac": format_mac(device.info.mac_address).upper(),
|
||||
"expected_mac": format_mac(self.config_mac_address).upper(),
|
||||
"actual_mac": format_mac(device_mac_address).upper(),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ def mock_config_entry() -> MockConfigEntry:
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: "192.168.1.123"},
|
||||
unique_id="aabbccddeeff",
|
||||
minor_version=2,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -292,12 +292,15 @@ async def test_zeroconf_unsupported_version_error(
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_wled")
|
||||
@pytest.mark.parametrize("device_mac", ["aabbccddeeff", "AABBCCDDEEFF"])
|
||||
async def test_user_device_exists_abort(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_wled: MagicMock,
|
||||
device_mac: str,
|
||||
) -> None:
|
||||
"""Test we abort zeroconf flow if WLED device already configured."""
|
||||
mock_wled.update.return_value.info.mac_address = device_mac
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
@@ -334,10 +337,12 @@ async def test_zeroconf_without_mac_device_exists_abort(
|
||||
assert result.get("reason") == "already_configured"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("device_mac", ["aabbccddeeff", "AABBCCDDEEFF"])
|
||||
async def test_zeroconf_with_mac_device_exists_abort(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_wled: MagicMock,
|
||||
device_mac: str,
|
||||
) -> None:
|
||||
"""Test we abort zeroconf flow if WLED device already configured."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
@@ -350,7 +355,7 @@ async def test_zeroconf_with_mac_device_exists_abort(
|
||||
hostname="example.local.",
|
||||
name="mock_name",
|
||||
port=None,
|
||||
properties={CONF_MAC: "aabbccddeeff"},
|
||||
properties={CONF_MAC: device_mac},
|
||||
type="mock_type",
|
||||
),
|
||||
)
|
||||
|
||||
@@ -7,7 +7,9 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
||||
import pytest
|
||||
from wled import WLEDConnectionError
|
||||
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.components.wled.const import DOMAIN
|
||||
from homeassistant.config_entries import SOURCE_IGNORE, ConfigEntryState
|
||||
from homeassistant.const import CONF_HOST
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
@@ -61,9 +63,151 @@ async def test_config_entry_not_ready(
|
||||
assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
async def test_setting_unique_id(
|
||||
hass: HomeAssistant, init_integration: MockConfigEntry
|
||||
@pytest.fixture
|
||||
def config_entry_v1() -> MockConfigEntry:
|
||||
"""Return a WLED config entry at version 1.0 with a specific MAC."""
|
||||
return MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: "192.168.1.123"},
|
||||
unique_id="AABBCCDDEEFF",
|
||||
minor_version=1,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry", "mock_wled")
|
||||
async def test_migrate_entry_future_version_is_downgrade(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Test we set unique ID if not set yet."""
|
||||
assert init_integration.runtime_data
|
||||
assert init_integration.unique_id == "aabbccddeeff"
|
||||
"""Return False when user downgraded from a future version."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
title="WLED Future",
|
||||
unique_id="AABBCCDDEEFF",
|
||||
version=2,
|
||||
minor_version=0,
|
||||
data={CONF_HOST: "wled.local"},
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result is False
|
||||
assert entry.state == ConfigEntryState.MIGRATION_ERROR
|
||||
assert entry.version == 2
|
||||
assert entry.minor_version == 0
|
||||
assert entry.unique_id == "AABBCCDDEEFF"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry", "mock_wled")
|
||||
async def test_migrate_entry_v1_to_1_2_no_duplicates(
|
||||
hass: HomeAssistant, config_entry_v1: MockConfigEntry
|
||||
) -> None:
|
||||
"""Migrate from 1.x to 1.2 when there are no other entries with same MAC."""
|
||||
config_entry_v1.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.async_setup(config_entry_v1.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result is True
|
||||
assert config_entry_v1.state == ConfigEntryState.LOADED
|
||||
assert config_entry_v1.version == 1
|
||||
assert config_entry_v1.minor_version == 2
|
||||
assert config_entry_v1.unique_id == "aabbccddeeff"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry", "mock_wled")
|
||||
async def test_migrate_entry_v1_with_ignored_duplicates(
|
||||
hass: HomeAssistant, config_entry_v1: MockConfigEntry
|
||||
) -> None:
|
||||
"""Remove ignored entries with the same MAC and then migrate."""
|
||||
config_entry_v1.add_to_hass(hass)
|
||||
|
||||
ignored_1 = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
title="Ignored 1",
|
||||
unique_id="aabbccddeeff",
|
||||
source=SOURCE_IGNORE,
|
||||
version=1,
|
||||
minor_version=0,
|
||||
data={"host": "wled-ignored-1.local"},
|
||||
)
|
||||
ignored_2 = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
title="Ignored 2",
|
||||
unique_id="aabbccddeeff",
|
||||
source=SOURCE_IGNORE,
|
||||
version=1,
|
||||
minor_version=0,
|
||||
data={"host": "wled-ignored-2.local"},
|
||||
)
|
||||
|
||||
ignored_1.add_to_hass(hass)
|
||||
ignored_2.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.async_setup(config_entry_v1.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result is True
|
||||
assert config_entry_v1.state == ConfigEntryState.LOADED
|
||||
assert config_entry_v1.version == 1
|
||||
assert config_entry_v1.minor_version == 2
|
||||
assert config_entry_v1.unique_id == "aabbccddeeff"
|
||||
|
||||
assert ignored_1.state is ConfigEntryState.NOT_LOADED
|
||||
assert ignored_2.state is ConfigEntryState.NOT_LOADED
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry", "mock_wled")
|
||||
async def test_migrate_entry_v1_with_non_ignored_duplicate_aborts(
|
||||
hass: HomeAssistant,
|
||||
config_entry_v1: MockConfigEntry,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Abort migration when there is another non-ignored entry with the same MAC."""
|
||||
config_entry_v1.add_to_hass(hass)
|
||||
|
||||
duplicate_active = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
title="Active duplicate",
|
||||
unique_id="aabbccddeeff",
|
||||
version=1,
|
||||
minor_version=0,
|
||||
data={"host": "wled-duplicate.local"},
|
||||
)
|
||||
duplicate_active.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.async_setup(config_entry_v1.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result is False
|
||||
assert config_entry_v1.state == ConfigEntryState.MIGRATION_ERROR
|
||||
assert config_entry_v1.version == 1
|
||||
assert config_entry_v1.minor_version == 1
|
||||
assert config_entry_v1.unique_id == "AABBCCDDEEFF"
|
||||
assert "multiple WLED config entries with the same MAC address" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry", "mock_wled")
|
||||
async def test_migrate_entry_already_at_1_2_is_noop(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Do nothing when entry is already at version 1.2."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
title="WLED Already 1.2",
|
||||
unique_id="aabbccddeeff",
|
||||
version=1,
|
||||
minor_version=2,
|
||||
data={"host": "wled.local"},
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result is True
|
||||
assert entry.state == ConfigEntryState.LOADED
|
||||
assert entry.version == 1
|
||||
assert entry.minor_version == 2
|
||||
assert entry.unique_id == "aabbccddeeff"
|
||||
|
||||
Reference in New Issue
Block a user