mirror of
https://github.com/home-assistant/core.git
synced 2026-05-08 17:49:37 +01:00
Migrate hdmi_cec to async (#168306)
This commit is contained in:
committed by
GitHub
parent
6bc3fcef36
commit
49e5b03c08
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.entity import Entity
|
||||
|
||||
from .const import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
|
||||
@@ -55,9 +56,10 @@ class CecEntity(Entity):
|
||||
else:
|
||||
self._attr_name = f"{self._device.type_name} {self._logical_address} ({self._device.osd_name})"
|
||||
|
||||
@callback
|
||||
def _hdmi_cec_unavailable(self, callback_event):
|
||||
self._attr_available = False
|
||||
self.schedule_update_ha_state(False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Register HDMI callbacks after initialization."""
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from pycec.commands import CecCommand, KeyPressCommand, KeyReleaseCommand
|
||||
from pycec.const import (
|
||||
@@ -31,7 +30,6 @@ from homeassistant.components.media_player import (
|
||||
MediaPlayerEntity,
|
||||
MediaPlayerEntityFeature,
|
||||
MediaPlayerState,
|
||||
MediaType,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
@@ -45,20 +43,20 @@ _LOGGER = logging.getLogger(__name__)
|
||||
ENTITY_ID_FORMAT = MP_DOMAIN + ".{}"
|
||||
|
||||
|
||||
def setup_platform(
|
||||
async def async_setup_platform(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Find and return HDMI devices as +switches."""
|
||||
"""Find and return HDMI devices as media players."""
|
||||
if discovery_info and ATTR_NEW in discovery_info:
|
||||
_LOGGER.debug("Setting up HDMI devices %s", discovery_info[ATTR_NEW])
|
||||
entities = []
|
||||
for device in discovery_info[ATTR_NEW]:
|
||||
hdmi_device = hass.data[DOMAIN][device]
|
||||
entities.append(CecPlayerEntity(hdmi_device, hdmi_device.logical_address))
|
||||
add_entities(entities, True)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class CecPlayerEntity(CecEntity, MediaPlayerEntity):
|
||||
@@ -79,78 +77,61 @@ class CecPlayerEntity(CecEntity, MediaPlayerEntity):
|
||||
|
||||
def send_playback(self, key):
|
||||
"""Send playback status to CEC adapter."""
|
||||
self._device.async_send_command(CecCommand(key, dst=self._logical_address))
|
||||
self._device.send_command(CecCommand(key, dst=self._logical_address))
|
||||
|
||||
def mute_volume(self, mute: bool) -> None:
|
||||
async def async_mute_volume(self, mute: bool) -> None:
|
||||
"""Mute volume."""
|
||||
self.send_keypress(KEY_MUTE_TOGGLE)
|
||||
|
||||
def media_previous_track(self) -> None:
|
||||
async def async_media_previous_track(self) -> None:
|
||||
"""Go to previous track."""
|
||||
self.send_keypress(KEY_BACKWARD)
|
||||
|
||||
def turn_on(self) -> None:
|
||||
async def async_turn_on(self) -> None:
|
||||
"""Turn device on."""
|
||||
self._device.turn_on()
|
||||
self._attr_state = MediaPlayerState.ON
|
||||
self.async_write_ha_state()
|
||||
|
||||
def clear_playlist(self) -> None:
|
||||
"""Clear players playlist."""
|
||||
raise NotImplementedError
|
||||
|
||||
def turn_off(self) -> None:
|
||||
async def async_turn_off(self) -> None:
|
||||
"""Turn device off."""
|
||||
self._device.turn_off()
|
||||
self._attr_state = MediaPlayerState.OFF
|
||||
self.async_write_ha_state()
|
||||
|
||||
def media_stop(self) -> None:
|
||||
async def async_media_stop(self) -> None:
|
||||
"""Stop playback."""
|
||||
self.send_keypress(KEY_STOP)
|
||||
self._attr_state = MediaPlayerState.IDLE
|
||||
self.async_write_ha_state()
|
||||
|
||||
def play_media(
|
||||
self, media_type: MediaType | str, media_id: str, **kwargs: Any
|
||||
) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_next_track(self) -> None:
|
||||
async def async_media_next_track(self) -> None:
|
||||
"""Skip to next track."""
|
||||
self.send_keypress(KEY_FORWARD)
|
||||
|
||||
def media_seek(self, position: float) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set volume level, range 0..1."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_pause(self) -> None:
|
||||
async def async_media_pause(self) -> None:
|
||||
"""Pause playback."""
|
||||
self.send_keypress(KEY_PAUSE)
|
||||
self._attr_state = MediaPlayerState.PAUSED
|
||||
self.async_write_ha_state()
|
||||
|
||||
def select_source(self, source: str) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_play(self) -> None:
|
||||
async def async_media_play(self) -> None:
|
||||
"""Start playback."""
|
||||
self.send_keypress(KEY_PLAY)
|
||||
self._attr_state = MediaPlayerState.PLAYING
|
||||
self.async_write_ha_state()
|
||||
|
||||
def volume_up(self) -> None:
|
||||
async def async_volume_up(self) -> None:
|
||||
"""Increase volume."""
|
||||
_LOGGER.debug("%s: volume up", self._logical_address)
|
||||
self.send_keypress(KEY_VOLUME_UP)
|
||||
|
||||
def volume_down(self) -> None:
|
||||
async def async_volume_down(self) -> None:
|
||||
"""Decrease volume."""
|
||||
_LOGGER.debug("%s: volume down", self._logical_address)
|
||||
self.send_keypress(KEY_VOLUME_DOWN)
|
||||
|
||||
def update(self) -> None:
|
||||
async def async_update(self) -> None:
|
||||
"""Update device status."""
|
||||
device = self._device
|
||||
if device.power_status in [POWER_OFF, 3]:
|
||||
|
||||
@@ -20,10 +20,10 @@ _LOGGER = logging.getLogger(__name__)
|
||||
ENTITY_ID_FORMAT = SWITCH_DOMAIN + ".{}"
|
||||
|
||||
|
||||
def setup_platform(
|
||||
async def async_setup_platform(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Find and return HDMI devices as switches."""
|
||||
@@ -33,7 +33,7 @@ def setup_platform(
|
||||
for device in discovery_info[ATTR_NEW]:
|
||||
hdmi_device = hass.data[DOMAIN][device]
|
||||
entities.append(CecSwitchEntity(hdmi_device, hdmi_device.logical_address))
|
||||
add_entities(entities, True)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class CecSwitchEntity(CecEntity, SwitchEntity):
|
||||
@@ -44,19 +44,19 @@ class CecSwitchEntity(CecEntity, SwitchEntity):
|
||||
CecEntity.__init__(self, device, logical)
|
||||
self.entity_id = f"{SWITCH_DOMAIN}.hdmi_{hex(self._logical_address)[2:]}"
|
||||
|
||||
def turn_on(self, **kwargs: Any) -> None:
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn device on."""
|
||||
self._device.turn_on()
|
||||
self._attr_is_on = True
|
||||
self.schedule_update_ha_state(force_refresh=False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn device off."""
|
||||
self._device.turn_off()
|
||||
self._attr_is_on = False
|
||||
self.schedule_update_ha_state(force_refresh=False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def update(self) -> None:
|
||||
async def async_update(self) -> None:
|
||||
"""Update device status."""
|
||||
device = self._device
|
||||
if device.power_status in {POWER_OFF, 3}:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Tests for the HDMI-CEC media player platform."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from pycec.const import (
|
||||
@@ -58,39 +57,6 @@ from homeassistant.core import HomeAssistant
|
||||
from . import MockHDMIDevice, assert_key_press_release
|
||||
from .conftest import CecEntityCreator, HDMINetworkCreator
|
||||
|
||||
type AssertState = Callable[[str, str], None]
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="assert_state",
|
||||
params=[
|
||||
False,
|
||||
pytest.param(
|
||||
True,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="""State isn't updated because the function is missing the
|
||||
`schedule_update_ha_state` for a correct push entity. Would still
|
||||
update once the data comes back from the device."""
|
||||
),
|
||||
),
|
||||
],
|
||||
ids=["skip_assert_state", "run_assert_state"],
|
||||
)
|
||||
def assert_state_fixture(request: pytest.FixtureRequest) -> AssertState:
|
||||
"""Allow for skipping the assert state changes.
|
||||
|
||||
This is broken in this entity, but we still want to test that
|
||||
the rest of the code works as expected.
|
||||
"""
|
||||
|
||||
def _test_state(state: str, expected: str) -> None:
|
||||
if request.param:
|
||||
assert state == expected
|
||||
else:
|
||||
assert True
|
||||
|
||||
return _test_state
|
||||
|
||||
|
||||
async def test_load_platform(
|
||||
hass: HomeAssistant,
|
||||
@@ -142,7 +108,6 @@ async def test_service_on(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test that media_player triggers on `on` service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -157,19 +122,17 @@ async def test_service_on(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
mock_hdmi_device.turn_on.assert_called_once_with()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_ON)
|
||||
assert state.state == STATE_ON
|
||||
|
||||
|
||||
async def test_service_off(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test that media_player triggers on `off` service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -188,7 +151,7 @@ async def test_service_off(
|
||||
mock_hdmi_device.turn_off.assert_called_once_with()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_OFF)
|
||||
assert state.state == STATE_OFF
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -317,7 +280,6 @@ async def test_volume_services(
|
||||
data,
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
@@ -348,7 +310,6 @@ async def test_track_change_services(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
@@ -373,7 +334,6 @@ async def test_playback_services(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
service: str,
|
||||
key: int,
|
||||
expected_state: str,
|
||||
@@ -389,13 +349,12 @@ async def test_playback_services(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, expected_state)
|
||||
assert state.state == expected_state
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="PLAY feature isn't enabled")
|
||||
@@ -403,7 +362,6 @@ async def test_play_pause_service(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test play pause service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -418,13 +376,12 @@ async def test_play_pause_service(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=KEY_PAUSE)
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_PAUSED)
|
||||
assert state.state == STATE_PAUSED
|
||||
|
||||
await hass.services.async_call(
|
||||
MEDIA_PLAYER_DOMAIN,
|
||||
@@ -432,7 +389,6 @@ async def test_play_pause_service(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 4
|
||||
assert_key_press_release(mock_hdmi_device.send_command, 1, dst=3, key=KEY_PLAY)
|
||||
@@ -527,9 +483,6 @@ async def test_starting_state(
|
||||
assert state.state == expected_state
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
reason="The code only sets the state to unavailable, doesn't set the `_attr_available` to false."
|
||||
)
|
||||
async def test_unavailable_status(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
@@ -541,6 +494,7 @@ async def test_unavailable_status(
|
||||
await create_cec_entity(hdmi_network, mock_hdmi_device)
|
||||
|
||||
hass.bus.async_fire(EVENT_HDMI_CEC_UNAVAILABLE)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert state.state == STATE_UNAVAILABLE
|
||||
|
||||
Reference in New Issue
Block a user