1
0
mirror of https://github.com/home-assistant/core.git synced 2026-04-02 00:20:30 +01:00

Improve discovery flow for Squeezebox (#153958)

This commit is contained in:
peteS-UK
2026-03-16 12:50:33 +00:00
committed by GitHub
parent 1fb59c9f11
commit 960666e15b
5 changed files with 713 additions and 514 deletions

View File

@@ -33,6 +33,7 @@ from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from .const import (
CONF_BROWSE_LIMIT,
CONF_HTTPS,
CONF_SERVER_LIST,
CONF_VOLUME_STEP,
DEFAULT_BROWSE_LIMIT,
DEFAULT_PORT,
@@ -45,45 +46,23 @@ _LOGGER = logging.getLogger(__name__)
TIMEOUT = 5
def _base_schema(
discovery_info: dict[str, Any] | None = None,
) -> vol.Schema:
"""Generate base schema."""
base_schema: dict[Any, Any] = {}
if discovery_info and CONF_HOST in discovery_info:
base_schema.update(
{
vol.Required(
CONF_HOST,
description={"suggested_value": discovery_info[CONF_HOST]},
): str,
}
)
else:
base_schema.update({vol.Required(CONF_HOST): str})
FULL_EDIT_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
vol.Optional(CONF_USERNAME): str,
vol.Optional(CONF_PASSWORD): str,
vol.Optional(CONF_HTTPS, default=False): bool,
}
)
if discovery_info and CONF_PORT in discovery_info:
base_schema.update(
{
vol.Required(
CONF_PORT,
default=DEFAULT_PORT,
description={"suggested_value": discovery_info[CONF_PORT]},
): int,
}
)
else:
base_schema.update({vol.Required(CONF_PORT, default=DEFAULT_PORT): int})
base_schema.update(
{
vol.Optional(CONF_USERNAME): str,
vol.Optional(CONF_PASSWORD): str,
vol.Optional(CONF_HTTPS, default=False): bool,
}
)
return vol.Schema(base_schema)
SHORT_EDIT_SCHEMA = vol.Schema(
{
vol.Optional(CONF_USERNAME): str,
vol.Optional(CONF_PASSWORD): str,
vol.Optional(CONF_HTTPS, default=False): bool,
}
)
class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
@@ -93,8 +72,9 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Initialize an instance of the squeezebox config flow."""
self.data_schema = _base_schema()
self.discovery_info: dict[str, Any] | None = None
self.discovery_task: asyncio.Task | None = None
self.discovered_servers: list[dict[str, Any]] = []
self.chosen_server: dict[str, Any] = {}
@staticmethod
@callback
@@ -102,34 +82,43 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
"""Get the options flow for this handler."""
return OptionsFlowHandler()
async def _discover(self, uuid: str | None = None) -> None:
async def _discover(self) -> None:
"""Discover an unconfigured LMS server."""
self.discovery_info = None
discovery_event = asyncio.Event()
# Reset discovery state to avoid stale or duplicate servers across runs
self.discovered_servers = []
self.chosen_server = {}
_discovery_task: asyncio.Task | None = None
def _discovery_callback(server: Server) -> None:
_discovery_info: dict[str, Any] | None = {}
if server.uuid:
# ignore already configured uuids
for entry in self._async_current_entries():
if entry.unique_id == server.uuid:
return
self.discovery_info = {
_discovery_info = {
CONF_HOST: server.host,
CONF_PORT: int(server.port),
"uuid": server.uuid,
"name": server.name,
}
_LOGGER.debug("Discovered server: %s", self.discovery_info)
discovery_event.set()
discovery_task = self.hass.async_create_task(
_LOGGER.debug(
"Discovered server: %s, creating discovery_info %s",
server,
_discovery_info,
)
if _discovery_info not in self.discovered_servers:
self.discovered_servers.append(_discovery_info)
_discovery_task = self.hass.async_create_task(
async_discover(_discovery_callback)
)
await discovery_event.wait()
discovery_task.cancel() # stop searching as soon as we find server
await asyncio.sleep(TIMEOUT)
# update with suggested values from discovery
self.data_schema = _base_schema(self.discovery_info)
_LOGGER.debug("Discovered Servers %s", self.discovered_servers)
_discovery_task.cancel()
async def _validate_input(self, data: dict[str, Any]) -> str | None:
"""Validate the user input allows us to connect.
@@ -142,7 +131,7 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
data[CONF_PORT],
data.get(CONF_USERNAME),
data.get(CONF_PASSWORD),
https=data[CONF_HTTPS],
https=data.get(CONF_HTTPS, False),
)
try:
@@ -164,35 +153,78 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
return None
async def async_step_choose_server(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose manual or discover flow."""
_chosen_host: str
if user_input:
_chosen_host = user_input[CONF_SERVER_LIST]
for _server in self.discovered_servers:
if _chosen_host == _server[CONF_HOST]:
self.chosen_server[CONF_HOST] = _chosen_host
self.chosen_server[CONF_PORT] = _server[CONF_PORT]
self.chosen_server[CONF_HTTPS] = False
return await self.async_step_edit_discovered()
_options = {
_server[CONF_HOST]: f"{_server['name']} ({_server[CONF_HOST]})"
for _server in self.discovered_servers
}
return self.async_show_form(
step_id="choose_server",
data_schema=vol.Schema({vol.Required(CONF_SERVER_LIST): vol.In(_options)}),
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors = {}
if user_input and CONF_HOST in user_input:
# update with host provided by user
self.data_schema = _base_schema(user_input)
return await self.async_step_edit()
# no host specified, see if we can discover an unconfigured LMS server
try:
async with asyncio.timeout(TIMEOUT):
await self._discover()
return await self.async_step_edit()
except TimeoutError:
errors["base"] = "no_server_found"
return self.async_show_menu(
step_id="user", menu_options=["start_discovery", "edit"]
)
# display the form
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Optional(CONF_HOST): str}),
errors=errors,
async def async_step_discovery_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a failed discovery."""
return self.async_show_menu(step_id="discovery_failed", menu_options=["edit"])
async def async_step_start_discovery(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
if not self.discovery_task:
self.discovery_task = self.hass.async_create_task(self._discover())
if self.discovery_task.done():
self.discovery_task.cancel()
self.discovery_task = None
# Sleep to allow task cancellation to complete
await asyncio.sleep(0.1)
return self.async_show_progress_done(
next_step_id="choose_server"
if self.discovered_servers
else "discovery_failed"
)
return self.async_show_progress(
step_id="start_discovery",
progress_action="start_discovery",
progress_task=self.discovery_task,
)
async def async_step_edit(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Edit a discovered or manually inputted server."""
errors = {}
if user_input:
error = await self._validate_input(user_input)
@@ -203,39 +235,95 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
errors["base"] = error
return self.async_show_form(
step_id="edit", data_schema=self.data_schema, errors=errors
step_id="edit",
data_schema=FULL_EDIT_SCHEMA,
errors=errors,
)
async def async_step_edit_discovered(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Edit a discovered or manually inputted server."""
if not (await self._validate_input(self.chosen_server)):
# Attempt to connect with default data successful
return self.async_create_entry(
title=self.chosen_server[CONF_HOST], data=self.chosen_server
)
errors = {}
if user_input:
user_input[CONF_HOST] = self.chosen_server[CONF_HOST]
user_input[CONF_PORT] = self.chosen_server[CONF_PORT]
error = await self._validate_input(user_input)
if not error:
return self.async_create_entry(
title=user_input[CONF_HOST], data=user_input
)
errors["base"] = error
return self.async_show_form(
step_id="edit_discovered",
description_placeholders={
"host": self.chosen_server[CONF_HOST],
"port": self.chosen_server[CONF_PORT],
},
data_schema=SHORT_EDIT_SCHEMA,
errors=errors,
)
async def async_step_edit_integration_discovered(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Edit a discovered or manually inputted server."""
errors = {}
if user_input:
user_input[CONF_HOST] = self.chosen_server[CONF_HOST]
user_input[CONF_PORT] = self.chosen_server[CONF_PORT]
error = await self._validate_input(user_input)
if not error:
return self.async_create_entry(
title=user_input[CONF_HOST], data=user_input
)
errors["base"] = error
return self.async_show_form(
step_id="edit_integration_discovered",
description_placeholders={
"desc": f"LMS Host: {self.chosen_server[CONF_HOST]}, Port: {self.chosen_server[CONF_PORT]}"
},
data_schema=SHORT_EDIT_SCHEMA,
errors=errors,
)
async def async_step_integration_discovery(
self, discovery_info: dict[str, Any]
self, _discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle discovery of a server."""
_LOGGER.debug("Reached server discovery flow with info: %s", discovery_info)
if "uuid" in discovery_info:
await self.async_set_unique_id(discovery_info.pop("uuid"))
_LOGGER.debug("Reached server discovery flow with info: %s", _discovery_info)
if "uuid" in _discovery_info:
await self.async_set_unique_id(_discovery_info.pop("uuid"))
self._abort_if_unique_id_configured()
else:
# attempt to connect to server and determine uuid. will fail if
# password required
error = await self._validate_input(discovery_info)
error = await self._validate_input(_discovery_info)
if error:
await self._async_handle_discovery_without_unique_id()
# update schema with suggested values from discovery
self.data_schema = _base_schema(discovery_info)
self.context.update({"title_placeholders": {"host": discovery_info[CONF_HOST]}})
return await self.async_step_edit()
self.context.update(
{"title_placeholders": {"host": _discovery_info[CONF_HOST]}}
)
self.chosen_server = _discovery_info
return await self.async_step_edit_integration_discovered()
async def async_step_dhcp(
self, discovery_info: DhcpServiceInfo
self, _discovery_info: DhcpServiceInfo
) -> ConfigFlowResult:
"""Handle dhcp discovery of a Squeezebox player."""
_LOGGER.debug(
"Reached dhcp discovery of a player with info: %s", discovery_info
"Reached dhcp discovery of a player with info: %s", _discovery_info
)
await self.async_set_unique_id(format_mac(discovery_info.macaddress))
await self.async_set_unique_id(format_mac(_discovery_info.macaddress))
self._abort_if_unique_id_configured()
_LOGGER.debug("Configuring dhcp player with unique id: %s", self.unique_id)

View File

@@ -56,3 +56,4 @@ ATTR_VOLUME = "volume"
ATTR_URL = "url"
UPDATE_PLUGINS_RELEASE_SUMMARY = "update_plugins_release_summary"
UPDATE_RELEASE_SUMMARY = "update_release_summary"
CONF_SERVER_LIST = "server_list"

View File

@@ -2,6 +2,7 @@
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"no_server_found": "No LMS found."
},
"error": {
@@ -12,7 +13,27 @@
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"flow_title": "{host}",
"progress": {
"start_discovery": "Attempting to discover new LMS servers\n\nThis will take about 5 seconds",
"title": "LMS discovery"
},
"step": {
"choose_server": {
"data": {
"server_list": "Server list"
},
"data_description": {
"server_list": "Choose the server to configure."
},
"title": "Discovered servers"
},
"discovery_failed": {
"description": "No LMS were discovered on the network.",
"menu_options": {
"edit": "Enter configuration manually"
},
"title": "Discovery failed"
},
"edit": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
@@ -22,21 +43,47 @@
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"host": "[%key:component::squeezebox::config::step::user::data_description::host%]",
"host": "The IP address or hostname of the LMS.",
"https": "Connect to the LMS over HTTPS (requires reverse proxy).",
"password": "The password from LMS Advanced Security (if defined).",
"port": "The web interface port on the LMS. The default is 9000.",
"username": "The username from LMS Advanced Security (if defined)."
},
"title": "Edit connection information"
}
},
"user": {
"edit_discovered": {
"data": {
"host": "[%key:common::config_flow::data::host%]"
"https": "Connect over HTTPS (requires reverse proxy)",
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"host": "The hostname or IP address of your Lyrion Music Server."
}
"https": "Connect to the LMS over HTTPS (requires reverse proxy).",
"password": "The password from LMS Advanced Security (if defined).",
"username": "The username from LMS Advanced Security (if defined)."
},
"description": "LMS Host: {host}, Port {port}",
"title": "Edit additional connection information"
},
"edit_integration_discovered": {
"data": {
"https": "Connect over HTTPS (requires reverse proxy)",
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"https": "Connect to the LMS over HTTPS (requires reverse proxy).",
"password": "The password from LMS Advanced Security (if defined).",
"username": "The username from LMS Advanced Security (if defined)."
},
"description": "{desc}",
"title": "Edit additional connection information"
},
"user": {
"menu_options": {
"edit": "Enter configuration manually",
"start_discovery": "Discover new LMS"
},
"title": "LMS configuration"
}
}
},
@@ -260,11 +307,11 @@
"description": "Calls a custom Squeezebox JSONRPC API.",
"fields": {
"command": {
"description": "Command to pass to Lyrion Music Server (p0 in the CLI documentation).",
"description": "Command to pass to LMS (p0 in the CLI documentation).",
"name": "Command"
},
"parameters": {
"description": "Array of additional parameters to pass to Lyrion Music Server (p1, ..., pN in the CLI documentation).",
"description": "Array of additional parameters to pass to LMS (p1, ..., pN in the CLI documentation).",
"name": "Parameters"
}
},

View File

@@ -143,43 +143,30 @@ def mock_server():
@pytest.fixture
def mock_discover_success():
"""Fixture to simulate successful async_discover."""
def mock_discover():
"""Fixture to mock async_discover with a default success state."""
async def _mock_discover(callback):
async def _mock_discover_success(callback):
class DummyServer:
host = "1.1.1.1"
port = 9000
uuid = SERVER_UUIDS[0] # Ensure UUID is defined or imported
uuid = SERVER_UUIDS[0]
name = "Test Server"
callback(DummyServer())
return [DummyServer()]
return _mock_discover
@pytest.fixture
def mock_discover_failure():
"""Simulate failed discovery without raising unhandled exceptions."""
async def _failed_discover(callback):
# Simulate no servers found, no callback triggered
return []
return _failed_discover
@pytest.fixture
def patch_discover():
"""Patch the async_discover function to prevent actual network calls."""
async def _mock_discover(callback):
return []
with patch(
"homeassistant.components.squeezebox.config_flow.async_discover",
side_effect=_mock_discover,
):
side_effect=_mock_discover_success,
) as mock:
yield mock
@pytest.fixture(autouse=True)
def mock_discover_timeout():
"""Mock the discovery timeout so tests run fast."""
with patch("homeassistant.components.squeezebox.config_flow.TIMEOUT", 0):
yield

View File

@@ -1,14 +1,16 @@
"""Test the Logitech Squeezebox config flow."""
"""Test the Squeezebox config flow."""
from http import HTTPStatus
from unittest.mock import patch
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from homeassistant import config_entries
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.squeezebox.const import (
CONF_BROWSE_LIMIT,
CONF_HTTPS,
CONF_SERVER_LIST,
CONF_VOLUME_STEP,
DOMAIN,
)
@@ -16,449 +18,523 @@ from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNA
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from .conftest import BROWSE_LIMIT, HOST, PORT, SERVER_UUIDS, VOLUME_STEP
from .conftest import SERVER_UUIDS
from tests.common import MockConfigEntry
USER_INPUT = {
CONF_HOST: HOST,
}
EDIT_INPUT = {
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
}
# Use the same UUIDs defined in the conftest
TEST_UUID = SERVER_UUIDS[0]
@pytest.mark.usefixtures("mock_setup_entry")
async def test_options_form(hass: HomeAssistant) -> None:
"""Test we can configure options."""
entry = MockConfigEntry(
data={
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
},
unique_id=SERVER_UUIDS[0],
domain=DOMAIN,
options={CONF_BROWSE_LIMIT: 1000, CONF_VOLUME_STEP: 5},
async def test_manual_setup(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test we can finish a manual setup successfully."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "init"
# simulate manual input of options
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONF_BROWSE_LIMIT: BROWSE_LIMIT, CONF_VOLUME_STEP: VOLUME_STEP},
)
# put some meaningful asserts here
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_BROWSE_LIMIT: BROWSE_LIMIT,
CONF_VOLUME_STEP: VOLUME_STEP,
}
async def test_user_flow_duplicate_entry(
hass: HomeAssistant,
mock_server,
mock_setup_entry,
mock_config_entry,
mock_discover_success,
) -> None:
"""Test user-initiated flow with existing entry (duplicate)."""
entry = mock_config_entry
entry.add_to_hass(hass)
mock_server.async_query.side_effect = query_success
mock_server.http_status = HTTPStatus.OK
with (
patch(
"homeassistant.components.squeezebox.config_flow.async_discover",
mock_discover_success,
),
patch("homeassistant.components.squeezebox.config_flow.TIMEOUT", 0.1),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert result["errors"] == {"base": "no_server_found"}
@pytest.mark.parametrize(
("discover_fixture", "expect_error", "expect_entry"),
[
("mock_discover_success", None, True),
("mock_discover_failure", "no_server_found", False),
],
)
async def test_user_flow_discovery_variants(
hass: HomeAssistant,
mock_server,
mock_setup_entry,
mock_discover_success,
mock_discover_failure,
discover_fixture,
expect_error,
expect_entry,
) -> None:
"""Test user-initiated flow variants: normal discovery and timeout."""
discover_func = (
mock_discover_success
if discover_fixture == "mock_discover_success"
else mock_discover_failure
)
mock_server.async_query.side_effect = lambda *args, **kwargs: {
"uuid": SERVER_UUIDS[0]
}
mock_server.http_status = HTTPStatus.OK
with (
patch(
"homeassistant.components.squeezebox.config_flow.async_discover",
discover_func,
),
patch("homeassistant.components.squeezebox.config_flow.TIMEOUT", 0.1),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
if expect_error:
assert result["type"] == FlowResultType.FORM
assert result["errors"] == {"base": expect_error}
return
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "edit"
assert CONF_HOST in result["data_schema"].schema
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], EDIT_INPUT
mock_server.async_query.return_value = {"uuid": TEST_UUID}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: "1.2.3.4", CONF_PORT: 9000},
)
if expect_entry:
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == HOST
assert result2["data"] == EDIT_INPUT
await hass.async_block_till_done()
assert len(mock_setup_entry.mock_calls) == 1
async def query_success(*args, **kwargs):
"""Simulate successful query returning UUID."""
return {"uuid": SERVER_UUIDS[0]}
async def query_cannot_connect(*args, **kwargs):
"""Simulate connection failure."""
return False # Simulate failure; set status separately
async def query_unauthorized(*args, **kwargs):
"""Simulate unauthorized access."""
return False # Simulate failure; set status separately
class SqueezeError(Exception):
"""Custom exception to simulate unexpected query failure."""
async def query_exception(*args, **kwargs):
"""Simulate unexpected exception."""
raise SqueezeError("Unexpected error")
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert result["title"] == "1.2.3.4"
assert result["data"][CONF_HOST] == "1.2.3.4"
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
("discovery_data", "query_behavior", "http_status", "expect_error"),
("query_return", "http_status", "expected_error"),
[
(
{CONF_HOST: HOST, CONF_PORT: PORT, "uuid": SERVER_UUIDS[0]},
query_success,
HTTPStatus.OK,
None,
), # UUID present, success
(
{CONF_HOST: HOST, CONF_PORT: PORT, CONF_HTTPS: False},
query_unauthorized,
HTTPStatus.UNAUTHORIZED,
"invalid_auth",
), # No UUID, unauthorized
(
{CONF_HOST: HOST, CONF_PORT: PORT, CONF_HTTPS: False},
query_cannot_connect,
HTTPStatus.BAD_GATEWAY,
"cannot_connect",
), # No UUID, connection failure
(
{CONF_HOST: HOST, CONF_PORT: PORT, CONF_HTTPS: False},
query_exception,
None,
"unknown",
), # No UUID, unexpected exception
(False, HTTPStatus.UNAUTHORIZED, "invalid_auth"),
(False, HTTPStatus.NOT_FOUND, "cannot_connect"),
({"no_uuid": True}, HTTPStatus.OK, "missing_uuid"),
],
)
async def test_discovery_flow_variants(
async def test_manual_setup_data_errors(
hass: HomeAssistant,
mock_server,
mock_setup_entry,
discovery_data,
query_behavior,
http_status,
expect_error,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
query_return: Any,
http_status: HTTPStatus,
expected_error: str,
) -> None:
"""Test integration discovery flow with and without UUID."""
"""Test data-driven error states during manual setup and recovery."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
# Inject behavior into mock_server
mock_server.async_query.side_effect = query_behavior
mock_server.async_query.return_value = query_return
mock_server.http_status = http_status
# Start flow
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "1.2.3.4", CONF_PORT: 9000}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == expected_error
mock_server.async_query.return_value = {"uuid": TEST_UUID}
mock_server.http_status = HTTPStatus.OK
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: "1.2.3.4",
CONF_PORT: 9000,
CONF_USERNAME: "admin",
CONF_PASSWORD: "password",
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert len(mock_setup_entry.mock_calls) == 1
async def test_manual_setup_exception_error(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
) -> None:
"""Test exception error state during manual setup and recovery."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
mock_server.async_query.side_effect = Exception("Connection fail")
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "1.2.3.4", CONF_PORT: 9000}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "unknown"
mock_server.async_query.side_effect = None
mock_server.async_query.return_value = {"uuid": TEST_UUID}
mock_server.http_status = HTTPStatus.OK
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: "1.2.3.4",
CONF_PORT: 9000,
CONF_USERNAME: "admin",
CONF_PASSWORD: "password",
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert len(mock_setup_entry.mock_calls) == 1
async def test_manual_setup_recovery(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test manual setup error recovery."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
mock_server.async_query.return_value = False
mock_server.http_status = HTTPStatus.UNAUTHORIZED
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: "1.2.3.4", CONF_PORT: 9000},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit"
assert result["errors"]["base"] == "invalid_auth"
mock_server.async_query.return_value = {"uuid": TEST_UUID}
mock_server.http_status = HTTPStatus.OK
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: "1.2.3.4",
CONF_PORT: 9000,
CONF_USERNAME: "admin",
CONF_PASSWORD: "correct_password",
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert result["title"] == "1.2.3.4"
assert result["data"][CONF_HOST] == "1.2.3.4"
assert len(mock_setup_entry.mock_calls) == 1
async def test_duplicate_setup(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test abort if setting up an already configured server."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
mock_server.async_query.return_value = {"uuid": TEST_UUID}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_HOST: "1.2.3.4", CONF_PORT: 9000},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_discovery_flow_success(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
mock_discover: MagicMock,
) -> None:
"""Test discovery flow where default connect succeeds immediately."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "start_discovery"}
)
assert result["type"] is FlowResultType.SHOW_PROGRESS
assert result["step_id"] == "start_discovery"
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "choose_server"
mock_server.async_query.return_value = {"uuid": TEST_UUID}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_SERVER_LIST: "1.1.1.1"}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert result["title"] == "1.1.1.1"
assert len(mock_setup_entry.mock_calls) == 1
async def test_discovery_flow_edit_discovered_success(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
mock_discover: MagicMock,
) -> None:
"""Test the successful outcome of the edit_discovered step."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "start_discovery"}
)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
mock_server.http_status = HTTPStatus.UNAUTHORIZED
mock_server.async_query.return_value = False
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_SERVER_LIST: "1.1.1.1"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_discovered"
mock_server.http_status = HTTPStatus.OK
mock_server.async_query.side_effect = [False, {"uuid": TEST_UUID}]
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_USERNAME: "admin", CONF_PASSWORD: "password"}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "1.1.1.1"
assert result["data"][CONF_HOST] == "1.1.1.1"
assert result["data"][CONF_USERNAME] == "admin"
assert result["data"][CONF_PASSWORD] == "password"
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
("side_effect", "http_status", "expected_error"),
[
(False, HTTPStatus.UNAUTHORIZED, "invalid_auth"),
(False, HTTPStatus.NOT_FOUND, "cannot_connect"),
({"no_uuid": True}, HTTPStatus.OK, "missing_uuid"),
(Exception("Test error"), None, "unknown"),
],
)
async def test_discovery_flow_edit_discovered_errors(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_server: AsyncMock,
mock_discover: MagicMock,
side_effect: Any,
http_status: HTTPStatus | None,
expected_error: str,
) -> None:
"""Test all error outcomes of the edit_discovered step."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "start_discovery"}
)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
mock_server.http_status = HTTPStatus.UNAUTHORIZED
mock_server.async_query.return_value = False
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_SERVER_LIST: "1.1.1.1"}
)
mock_server.http_status = http_status
mock_server.async_query.side_effect = [False, side_effect]
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_USERNAME: "admin", CONF_PASSWORD: "password"}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == expected_error
async def test_discovery_flow_failed(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_discover: MagicMock
) -> None:
"""Test discovery flow when no servers are found."""
async def _failed_discover(callback: Any) -> list:
return []
mock_discover.side_effect = _failed_discover
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "start_discovery"}
)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "discovery_failed"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "edit"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit"
async def test_discovery_ignores_existing(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
mock_config_entry: MockConfigEntry,
mock_discover: MagicMock,
) -> None:
"""Test discovery properly ignores a server that's already configured."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "start_discovery"}
)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "discovery_failed"
async def test_integration_discovery(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test integration discovery flow with a provided UUID."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data=discovery_data,
data={CONF_HOST: "1.2.3.4", CONF_PORT: 9000, "uuid": TEST_UUID},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_integration_discovered"
mock_server.async_query.return_value = {"uuid": TEST_UUID}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_USERNAME: "admin", CONF_PASSWORD: "password"}
)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "edit"
# First configure attempt
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
},
)
if expect_error:
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": expect_error}
# Recovery attempt
mock_server.async_query.side_effect = query_success
mock_server.http_status = HTTPStatus.OK
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
},
)
result = result3
else:
result = result2
# Final assertions
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["title"] == HOST
assert result["data"] == {
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
}
assert result["context"]["unique_id"] == SERVER_UUIDS[0]
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert result["title"] == "1.2.3.4"
assert len(mock_setup_entry.mock_calls) == 1
async def test_dhcp_discovery_flow_success(
hass: HomeAssistant,
mock_server,
mock_setup_entry,
mock_discover_success,
dhcp_info,
async def test_integration_discovery_no_uuid(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test DHCP discovery flow with successful discovery and query."""
"""Test integration discovery flow without a UUID."""
mock_server.async_query.return_value = {"uuid": TEST_UUID}
# Inject successful query behavior
mock_server.async_query.side_effect = query_success
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={CONF_HOST: "1.2.3.4", CONF_PORT: 9000},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_integration_discovered"
async def test_integration_discovery_no_uuid_fails(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test integration discovery flow routes to form when connection fails."""
mock_server.async_query.return_value = False
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={CONF_HOST: "1.2.3.4", CONF_PORT: 9000},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_integration_discovered"
async def test_integration_discovery_edit_recovery(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_server: AsyncMock
) -> None:
"""Test editing an integration discovery returns errors and can recover."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={CONF_HOST: "1.2.3.4", CONF_PORT: 9000, "uuid": TEST_UUID},
)
mock_server.async_query.return_value = False
mock_server.http_status = HTTPStatus.UNAUTHORIZED
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_USERNAME: "admin", CONF_PASSWORD: "wrongpassword"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_integration_discovered"
assert result["errors"]["base"] == "invalid_auth"
mock_server.async_query.return_value = {"uuid": TEST_UUID}
mock_server.http_status = HTTPStatus.OK
with (
patch(
"homeassistant.components.squeezebox.config_flow.async_discover",
mock_discover_success,
),
patch("homeassistant.components.squeezebox.config_flow.TIMEOUT", 0.1),
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp_info,
)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "edit"
# Final configure step
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], EDIT_INPUT
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_USERNAME: "admin", CONF_PASSWORD: "correctpassword"}
)
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == "1.1.1.1"
assert result2["data"] == EDIT_INPUT
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == TEST_UUID
assert result["title"] == "1.2.3.4"
assert len(mock_setup_entry.mock_calls) == 1
async def test_dhcp_discovery_existing_player(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
dhcp_info: DhcpServiceInfo,
async def test_dhcp_unknown_player(
hass: HomeAssistant, dhcp_info: dict[str, Any]
) -> None:
"""Test that we properly ignore known players during DHCP discovery."""
# Register a squeezebox media_player entity with the same MAC unique_id
entity_registry.async_get_or_create(
domain="media_player",
platform=DOMAIN,
unique_id=format_mac("aabbccddeeff"),
)
# Fire DHCP discovery for the same MAC
"""Test DHCP discovery of an unconfigured player routes to user setup."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp_info,
)
# Flow should abort because the player is already known
assert result["type"] == FlowResultType.ABORT
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
async def test_dhcp_known_player(
hass: HomeAssistant, dhcp_info: dict[str, Any], mock_config_entry: MockConfigEntry
) -> None:
"""Test DHCP discovery aborts if player is already registered."""
mock_config_entry.add_to_hass(hass)
entity_registry = er.async_get(hass)
entity_registry.async_get_or_create(
MP_DOMAIN, DOMAIN, "aa:bb:cc:dd:ee:ff", config_entry=mock_config_entry
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp_info,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
("query_behavior", "http_status", "expected_error"),
[
(query_unauthorized, HTTPStatus.UNAUTHORIZED, "invalid_auth"),
(query_cannot_connect, HTTPStatus.BAD_GATEWAY, "cannot_connect"),
(query_exception, None, "unknown"),
],
)
async def test_flow_errors_and_recovery(
hass: HomeAssistant,
mock_setup_entry,
mock_server,
query_behavior,
http_status,
expected_error,
patch_discover,
async def test_options_flow(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Test config flow error handling and recovery."""
"""Test the options flow."""
mock_config_entry.add_to_hass(hass)
# Start flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
assert result["step_id"] == "edit"
result = await hass.config_entries.options.async_init(mock_config_entry.entry_id)
# Inject error
mock_server.async_query.side_effect = query_behavior
mock_server.http_status = http_status
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], EDIT_INPUT
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": expected_error}
# Recover
mock_server.async_query.side_effect = None
mock_server.async_query.return_value = {"uuid": SERVER_UUIDS[0]}
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], EDIT_INPUT
)
assert result3["type"] == FlowResultType.CREATE_ENTRY
assert result3["title"] == HOST
assert result3["data"] == EDIT_INPUT
async def test_form_missing_uuid(hass: HomeAssistant) -> None:
"""Test we handle cannot connect error, then succeed after retry."""
# Start the flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "edit"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "init"
# First attempt: simulate cannot connect
with patch(
"pysqueezebox.Server.async_query",
return_value={"some_other_key": "some_value"},
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
},
)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONF_BROWSE_LIMIT: 500, CONF_VOLUME_STEP: 5},
)
# We should still be in a form, with an error
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "missing_uuid"}
# Second attempt: simulate a successful connection
with patch(
"pysqueezebox.Server.async_query",
return_value={"uuid": SERVER_UUIDS[0]},
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == HOST # the flow uses host as title
assert result["data"] == {
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_USERNAME: "",
CONF_PASSWORD: "",
CONF_HTTPS: False,
}
assert result["context"]["unique_id"] == SERVER_UUIDS[0]
assert result["type"] is FlowResultType.CREATE_ENTRY
assert mock_config_entry.options == {
CONF_BROWSE_LIMIT: 500,
CONF_VOLUME_STEP: 5,
}