"""Tests for Sonos services.""" import asyncio from contextlib import asynccontextmanager import logging import re from unittest.mock import Mock, patch import pytest from homeassistant.components.media_player import ( DOMAIN as MP_DOMAIN, SERVICE_JOIN, SERVICE_UNJOIN, ) from homeassistant.components.sonos.const import LONG_SERVICE_TIMEOUT from homeassistant.const import ATTR_ENTITY_ID from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import entity_registry as er from .conftest import MockSoCo, create_zgs_sonos_event, group_speakers, ungroup_speakers async def test_media_player_join( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], caplog: pytest.LogCaptureFixture, ) -> None: """Test joining two speakers together.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] # After dispatching the join to the speakers, the integration waits for the # group to be updated before returning. To simulate this we will dispatch # a ZGS event to group the speaker. This event is # triggered by the firing of the join_complete_event in the join mock. join_complete_event = asyncio.Event() def mock_join(*args, **kwargs) -> None: hass.loop.call_soon_threadsafe(join_complete_event.set) soco_bedroom.join = Mock(side_effect=mock_join) with caplog.at_level(logging.WARNING): caplog.clear() await hass.services.async_call( MP_DOMAIN, SERVICE_JOIN, { "entity_id": "media_player.living_room", "group_members": ["media_player.bedroom"], }, blocking=False, ) await join_complete_event.wait() # Fire the ZGS event to update the speaker grouping as the join method is waiting # for the speakers to be regrouped. group_speakers(soco_living_room, soco_bedroom) await hass.async_block_till_done(wait_background_tasks=True) # Code logs warning messages if the join is not successful, so we check # that no warning messages were logged. assert len(caplog.records) == 0 # The API joins the group members to the entity_id speaker. assert soco_bedroom.join.call_count == 1 assert soco_bedroom.join.call_args[0][0] == soco_living_room assert soco_living_room.join.call_count == 0 async def test_media_player_join_bad_entity( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], ) -> None: """Test error handling of joining with a bad entity.""" # Ensure an error is raised if the entity is unknown with pytest.raises(HomeAssistantError) as excinfo: await hass.services.async_call( MP_DOMAIN, SERVICE_JOIN, { "entity_id": "media_player.living_room", "group_members": "media_player.bad_entity", }, blocking=True, ) assert "media_player.bad_entity" in str(excinfo.value) async def test_media_player_join_entity_no_speaker( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], entity_registry: er.EntityRegistry, ) -> None: """Test error handling of joining with no associated speaker.""" bad_media_player = entity_registry.async_get_or_create( "media_player", "demo", "1234" ) # Ensure an error is raised if the entity does not have a speaker with pytest.raises(HomeAssistantError) as excinfo: await hass.services.async_call( MP_DOMAIN, SERVICE_JOIN, { "entity_id": "media_player.living_room", "group_members": bad_media_player.entity_id, }, blocking=True, ) assert bad_media_player.entity_id in str(excinfo.value) @asynccontextmanager async def instant_timeout(*args, **kwargs) -> None: """Mock a timeout error.""" raise TimeoutError # This is never reached, but is needed to satisfy the asynccontextmanager yield # pylint: disable=unreachable async def test_media_player_join_timeout( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], caplog: pytest.LogCaptureFixture, ) -> None: """Test joining of two speakers with timeout error.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] expected = ( "Timeout while waiting for Sonos player to join the " "group Living Room: Living Room, Bedroom" ) with ( patch( "homeassistant.components.sonos.speaker.asyncio.timeout", instant_timeout ), pytest.raises(HomeAssistantError, match=re.escape(expected)), ): await hass.services.async_call( MP_DOMAIN, SERVICE_JOIN, { "entity_id": "media_player.living_room", "group_members": ["media_player.bedroom"], }, blocking=True, ) assert soco_bedroom.join.call_count == 1 assert soco_bedroom.join.call_args[0][0] == soco_living_room assert soco_living_room.join.call_count == 0 async def test_media_player_unjoin_timeout( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], ) -> None: """Test unjoining of speaker with timeout error.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] # First group the speakers together group_speakers(soco_living_room, soco_bedroom) await hass.async_block_till_done(wait_background_tasks=True) expected = ( "Timeout while waiting for Sonos player to unjoin the group Bedroom: Bedroom" ) with ( patch( "homeassistant.components.sonos.speaker.asyncio.timeout", instant_timeout ), pytest.raises(HomeAssistantError, match=re.escape(expected)), ): await hass.services.async_call( MP_DOMAIN, SERVICE_UNJOIN, {ATTR_ENTITY_ID: "media_player.bedroom"}, blocking=True, ) assert soco_bedroom.unjoin.call_count == 1 async def test_media_player_unjoin( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], caplog: pytest.LogCaptureFixture, ) -> None: """Test unjoing two speaker.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] # First group the speakers together group_speakers(soco_living_room, soco_bedroom) await hass.async_block_till_done(wait_background_tasks=True) # Now that the speaker are joined, test unjoining unjoin_complete_event = asyncio.Event() def mock_unjoin(*args, **kwargs): hass.loop.call_soon_threadsafe(unjoin_complete_event.set) soco_bedroom.unjoin = Mock(side_effect=mock_unjoin) with caplog.at_level(logging.WARNING): caplog.clear() await hass.services.async_call( MP_DOMAIN, SERVICE_UNJOIN, {"entity_id": "media_player.bedroom"}, blocking=False, ) await unjoin_complete_event.wait() # Fire the ZGS event to ungroup the speakers as the unjoin method is waiting # for the speakers to be ungrouped. ungroup_speakers(soco_living_room, soco_bedroom) await hass.async_block_till_done(wait_background_tasks=True) assert len(caplog.records) == 0 soco_bedroom.unjoin.assert_called_with(timeout=LONG_SERVICE_TIMEOUT) assert soco_living_room.unjoin.call_count == 0 async def test_media_player_unjoin_already_unjoined( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], caplog: pytest.LogCaptureFixture, ) -> None: """Test unjoining when already unjoined.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] with caplog.at_level(logging.WARNING): caplog.clear() await hass.services.async_call( MP_DOMAIN, SERVICE_UNJOIN, {"entity_id": "media_player.bedroom"}, blocking=True, ) assert len(caplog.records) == 0 # Should not have called unjoin, since the speakers are already unjoined. assert soco_bedroom.unjoin.call_count == 0 assert soco_living_room.unjoin.call_count == 0 async def test_unjoin_completes_when_coordinator_receives_event_first( hass: HomeAssistant, sonos_setup_two_speakers: list[MockSoCo], caplog: pytest.LogCaptureFixture, ) -> None: """Test that unjoin completes even when only coordinator receives ZGS event.""" soco_living_room = sonos_setup_two_speakers[0] soco_bedroom = sonos_setup_two_speakers[1] # First, group the speakers together group_speakers(soco_living_room, soco_bedroom) await hass.async_block_till_done(wait_background_tasks=True) # Verify initial grouped state expected_group = ["media_player.living_room", "media_player.bedroom"] assert ( hass.states.get("media_player.living_room").attributes["group_members"] == expected_group ) assert ( hass.states.get("media_player.bedroom").attributes["group_members"] == expected_group ) unjoin_complete_event = asyncio.Event() def mock_unjoin(*args, **kwargs) -> None: hass.loop.call_soon_threadsafe(unjoin_complete_event.set) soco_bedroom.unjoin = Mock(side_effect=mock_unjoin) with caplog.at_level(logging.WARNING): caplog.clear() await hass.services.async_call( MP_DOMAIN, SERVICE_UNJOIN, {ATTR_ENTITY_ID: "media_player.bedroom"}, blocking=False, ) await unjoin_complete_event.wait() # Fire ZGS event only to coordinator to test clearing of bedroom speaker ungroup_event = create_zgs_sonos_event( "zgs_two_single.xml", soco_living_room, soco_bedroom, create_uui_ds_in_group=False, ) soco_living_room.zoneGroupTopology.subscribe.return_value._callback( ungroup_event ) await hass.async_block_till_done(wait_background_tasks=True) # Should complete without warnings or timeout errors assert len(caplog.records) == 0 assert soco_bedroom.unjoin.call_count == 1 state = hass.states.get("media_player.living_room") assert state.attributes["group_members"] == ["media_player.living_room"] state = hass.states.get("media_player.bedroom") assert state.attributes["group_members"] == ["media_player.bedroom"]