mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 21:06:19 +00:00
Restructure device tracker (#23862)
* Restructure device tracker * Docstyle * Fix typing * Lint * Lint * Fix tests
This commit is contained in:
528
homeassistant/components/device_tracker/legacy.py
Normal file
528
homeassistant/components/device_tracker/legacy.py
Normal file
@@ -0,0 +1,528 @@
|
||||
"""Legacy device tracker classes."""
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from typing import Any, List, Sequence
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.components import zone
|
||||
from homeassistant.components.group import (
|
||||
ATTR_ADD_ENTITIES, ATTR_ENTITIES, ATTR_OBJECT_ID, ATTR_VISIBLE,
|
||||
DOMAIN as DOMAIN_GROUP, SERVICE_SET)
|
||||
from homeassistant.components.zone.zone import async_active_zone
|
||||
from homeassistant.config import load_yaml_config_file, async_log_exception
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.restore_state import RestoreEntity
|
||||
from homeassistant.helpers.typing import GPSType, HomeAssistantType
|
||||
from homeassistant import util
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.util.yaml import dump
|
||||
|
||||
from homeassistant.const import (
|
||||
ATTR_ENTITY_ID, ATTR_GPS_ACCURACY, ATTR_ICON, ATTR_LATITUDE,
|
||||
ATTR_LONGITUDE, ATTR_NAME, CONF_ICON, CONF_MAC, CONF_NAME,
|
||||
DEVICE_DEFAULT_NAME, STATE_NOT_HOME, STATE_HOME)
|
||||
|
||||
from .const import (
|
||||
ATTR_BATTERY,
|
||||
ATTR_HOST_NAME,
|
||||
ATTR_MAC,
|
||||
ATTR_SOURCE_TYPE,
|
||||
CONF_AWAY_HIDE,
|
||||
CONF_CONSIDER_HOME,
|
||||
CONF_NEW_DEVICE_DEFAULTS,
|
||||
CONF_TRACK_NEW,
|
||||
DEFAULT_AWAY_HIDE,
|
||||
DEFAULT_CONSIDER_HOME,
|
||||
DEFAULT_TRACK_NEW,
|
||||
DOMAIN,
|
||||
ENTITY_ID_FORMAT,
|
||||
LOGGER,
|
||||
SOURCE_TYPE_GPS,
|
||||
)
|
||||
|
||||
YAML_DEVICES = 'known_devices.yaml'
|
||||
GROUP_NAME_ALL_DEVICES = 'all devices'
|
||||
EVENT_NEW_DEVICE = 'device_tracker_new_device'
|
||||
|
||||
|
||||
async def get_tracker(hass, config):
|
||||
"""Create a tracker."""
|
||||
yaml_path = hass.config.path(YAML_DEVICES)
|
||||
|
||||
conf = config.get(DOMAIN, [])
|
||||
conf = conf[0] if conf else {}
|
||||
consider_home = conf.get(CONF_CONSIDER_HOME, DEFAULT_CONSIDER_HOME)
|
||||
|
||||
defaults = conf.get(CONF_NEW_DEVICE_DEFAULTS, {})
|
||||
track_new = conf.get(CONF_TRACK_NEW)
|
||||
if track_new is None:
|
||||
track_new = defaults.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW)
|
||||
|
||||
devices = await async_load_config(yaml_path, hass, consider_home)
|
||||
tracker = DeviceTracker(
|
||||
hass, consider_home, track_new, defaults, devices)
|
||||
return tracker
|
||||
|
||||
|
||||
class DeviceTracker:
|
||||
"""Representation of a device tracker."""
|
||||
|
||||
def __init__(self, hass: HomeAssistantType, consider_home: timedelta,
|
||||
track_new: bool, defaults: dict,
|
||||
devices: Sequence) -> None:
|
||||
"""Initialize a device tracker."""
|
||||
self.hass = hass
|
||||
self.devices = {dev.dev_id: dev for dev in devices}
|
||||
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
|
||||
self.consider_home = consider_home
|
||||
self.track_new = track_new if track_new is not None \
|
||||
else defaults.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW)
|
||||
self.defaults = defaults
|
||||
self.group = None
|
||||
self._is_updating = asyncio.Lock(loop=hass.loop)
|
||||
|
||||
for dev in devices:
|
||||
if self.devices[dev.dev_id] is not dev:
|
||||
LOGGER.warning('Duplicate device IDs detected %s', dev.dev_id)
|
||||
if dev.mac and self.mac_to_dev[dev.mac] is not dev:
|
||||
LOGGER.warning('Duplicate device MAC addresses detected %s',
|
||||
dev.mac)
|
||||
|
||||
def see(self, mac: str = None, dev_id: str = None, host_name: str = None,
|
||||
location_name: str = None, gps: GPSType = None,
|
||||
gps_accuracy: int = None, battery: int = None,
|
||||
attributes: dict = None, source_type: str = SOURCE_TYPE_GPS,
|
||||
picture: str = None, icon: str = None,
|
||||
consider_home: timedelta = None):
|
||||
"""Notify the device tracker that you see a device."""
|
||||
self.hass.add_job(
|
||||
self.async_see(mac, dev_id, host_name, location_name, gps,
|
||||
gps_accuracy, battery, attributes, source_type,
|
||||
picture, icon, consider_home)
|
||||
)
|
||||
|
||||
async def async_see(
|
||||
self, mac: str = None, dev_id: str = None, host_name: str = None,
|
||||
location_name: str = None, gps: GPSType = None,
|
||||
gps_accuracy: int = None, battery: int = None,
|
||||
attributes: dict = None, source_type: str = SOURCE_TYPE_GPS,
|
||||
picture: str = None, icon: str = None,
|
||||
consider_home: timedelta = None):
|
||||
"""Notify the device tracker that you see a device.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if mac is None and dev_id is None:
|
||||
raise HomeAssistantError('Neither mac or device id passed in')
|
||||
if mac is not None:
|
||||
mac = str(mac).upper()
|
||||
device = self.mac_to_dev.get(mac)
|
||||
if not device:
|
||||
dev_id = util.slugify(host_name or '') or util.slugify(mac)
|
||||
else:
|
||||
dev_id = cv.slug(str(dev_id).lower())
|
||||
device = self.devices.get(dev_id)
|
||||
|
||||
if device:
|
||||
await device.async_seen(
|
||||
host_name, location_name, gps, gps_accuracy, battery,
|
||||
attributes, source_type, consider_home)
|
||||
if device.track:
|
||||
await device.async_update_ha_state()
|
||||
return
|
||||
|
||||
# If no device can be found, create it
|
||||
dev_id = util.ensure_unique_string(dev_id, self.devices.keys())
|
||||
device = Device(
|
||||
self.hass, consider_home or self.consider_home, self.track_new,
|
||||
dev_id, mac, (host_name or dev_id).replace('_', ' '),
|
||||
picture=picture, icon=icon,
|
||||
hide_if_away=self.defaults.get(CONF_AWAY_HIDE, DEFAULT_AWAY_HIDE))
|
||||
self.devices[dev_id] = device
|
||||
if mac is not None:
|
||||
self.mac_to_dev[mac] = device
|
||||
|
||||
await device.async_seen(
|
||||
host_name, location_name, gps, gps_accuracy, battery, attributes,
|
||||
source_type)
|
||||
|
||||
if device.track:
|
||||
await device.async_update_ha_state()
|
||||
|
||||
# During init, we ignore the group
|
||||
if self.group and self.track_new:
|
||||
self.hass.async_create_task(
|
||||
self.hass.async_call(
|
||||
DOMAIN_GROUP, SERVICE_SET, {
|
||||
ATTR_OBJECT_ID: util.slugify(GROUP_NAME_ALL_DEVICES),
|
||||
ATTR_VISIBLE: False,
|
||||
ATTR_NAME: GROUP_NAME_ALL_DEVICES,
|
||||
ATTR_ADD_ENTITIES: [device.entity_id]}))
|
||||
|
||||
self.hass.bus.async_fire(EVENT_NEW_DEVICE, {
|
||||
ATTR_ENTITY_ID: device.entity_id,
|
||||
ATTR_HOST_NAME: device.host_name,
|
||||
ATTR_MAC: device.mac,
|
||||
})
|
||||
|
||||
# update known_devices.yaml
|
||||
self.hass.async_create_task(
|
||||
self.async_update_config(
|
||||
self.hass.config.path(YAML_DEVICES), dev_id, device)
|
||||
)
|
||||
|
||||
async def async_update_config(self, path, dev_id, device):
|
||||
"""Add device to YAML configuration file.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
async with self._is_updating:
|
||||
await self.hass.async_add_executor_job(
|
||||
update_config, self.hass.config.path(YAML_DEVICES),
|
||||
dev_id, device)
|
||||
|
||||
@callback
|
||||
def async_setup_group(self):
|
||||
"""Initialize group for all tracked devices.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
entity_ids = [dev.entity_id for dev in self.devices.values()
|
||||
if dev.track]
|
||||
|
||||
self.hass.async_create_task(
|
||||
self.hass.services.async_call(
|
||||
DOMAIN_GROUP, SERVICE_SET, {
|
||||
ATTR_OBJECT_ID: util.slugify(GROUP_NAME_ALL_DEVICES),
|
||||
ATTR_VISIBLE: False,
|
||||
ATTR_NAME: GROUP_NAME_ALL_DEVICES,
|
||||
ATTR_ENTITIES: entity_ids}))
|
||||
|
||||
@callback
|
||||
def async_update_stale(self, now: dt_util.dt.datetime):
|
||||
"""Update stale devices.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
for device in self.devices.values():
|
||||
if (device.track and device.last_update_home) and \
|
||||
device.stale(now):
|
||||
self.hass.async_create_task(device.async_update_ha_state(True))
|
||||
|
||||
async def async_setup_tracked_device(self):
|
||||
"""Set up all not exists tracked devices.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
async def async_init_single_device(dev):
|
||||
"""Init a single device_tracker entity."""
|
||||
await dev.async_added_to_hass()
|
||||
await dev.async_update_ha_state()
|
||||
|
||||
tasks = []
|
||||
for device in self.devices.values():
|
||||
if device.track and not device.last_seen:
|
||||
tasks.append(self.hass.async_create_task(
|
||||
async_init_single_device(device)))
|
||||
|
||||
if tasks:
|
||||
await asyncio.wait(tasks, loop=self.hass.loop)
|
||||
|
||||
|
||||
class Device(RestoreEntity):
|
||||
"""Represent a tracked device."""
|
||||
|
||||
host_name = None # type: str
|
||||
location_name = None # type: str
|
||||
gps = None # type: GPSType
|
||||
gps_accuracy = 0 # type: int
|
||||
last_seen = None # type: dt_util.dt.datetime
|
||||
consider_home = None # type: dt_util.dt.timedelta
|
||||
battery = None # type: int
|
||||
attributes = None # type: dict
|
||||
icon = None # type: str
|
||||
|
||||
# Track if the last update of this device was HOME.
|
||||
last_update_home = False
|
||||
_state = STATE_NOT_HOME
|
||||
|
||||
def __init__(self, hass: HomeAssistantType, consider_home: timedelta,
|
||||
track: bool, dev_id: str, mac: str, name: str = None,
|
||||
picture: str = None, gravatar: str = None, icon: str = None,
|
||||
hide_if_away: bool = False) -> None:
|
||||
"""Initialize a device."""
|
||||
self.hass = hass
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(dev_id)
|
||||
|
||||
# Timedelta object how long we consider a device home if it is not
|
||||
# detected anymore.
|
||||
self.consider_home = consider_home
|
||||
|
||||
# Device ID
|
||||
self.dev_id = dev_id
|
||||
self.mac = mac
|
||||
|
||||
# If we should track this device
|
||||
self.track = track
|
||||
|
||||
# Configured name
|
||||
self.config_name = name
|
||||
|
||||
# Configured picture
|
||||
if gravatar is not None:
|
||||
self.config_picture = get_gravatar_for_email(gravatar)
|
||||
else:
|
||||
self.config_picture = picture
|
||||
|
||||
self.icon = icon
|
||||
|
||||
self.away_hide = hide_if_away
|
||||
|
||||
self.source_type = None
|
||||
|
||||
self._attributes = {}
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the entity."""
|
||||
return self.config_name or self.host_name or DEVICE_DEFAULT_NAME
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the device."""
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def entity_picture(self):
|
||||
"""Return the picture of the device."""
|
||||
return self.config_picture
|
||||
|
||||
@property
|
||||
def state_attributes(self):
|
||||
"""Return the device state attributes."""
|
||||
attr = {
|
||||
ATTR_SOURCE_TYPE: self.source_type
|
||||
}
|
||||
|
||||
if self.gps:
|
||||
attr[ATTR_LATITUDE] = self.gps[0]
|
||||
attr[ATTR_LONGITUDE] = self.gps[1]
|
||||
attr[ATTR_GPS_ACCURACY] = self.gps_accuracy
|
||||
|
||||
if self.battery:
|
||||
attr[ATTR_BATTERY] = self.battery
|
||||
|
||||
return attr
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
"""Return device state attributes."""
|
||||
return self._attributes
|
||||
|
||||
@property
|
||||
def hidden(self):
|
||||
"""If device should be hidden."""
|
||||
return self.away_hide and self.state != STATE_HOME
|
||||
|
||||
async def async_seen(
|
||||
self, host_name: str = None, location_name: str = None,
|
||||
gps: GPSType = None, gps_accuracy=0, battery: int = None,
|
||||
attributes: dict = None,
|
||||
source_type: str = SOURCE_TYPE_GPS,
|
||||
consider_home: timedelta = None):
|
||||
"""Mark the device as seen."""
|
||||
self.source_type = source_type
|
||||
self.last_seen = dt_util.utcnow()
|
||||
self.host_name = host_name
|
||||
self.location_name = location_name
|
||||
self.consider_home = consider_home or self.consider_home
|
||||
|
||||
if battery:
|
||||
self.battery = battery
|
||||
if attributes:
|
||||
self._attributes.update(attributes)
|
||||
|
||||
self.gps = None
|
||||
|
||||
if gps is not None:
|
||||
try:
|
||||
self.gps = float(gps[0]), float(gps[1])
|
||||
self.gps_accuracy = gps_accuracy or 0
|
||||
except (ValueError, TypeError, IndexError):
|
||||
self.gps = None
|
||||
self.gps_accuracy = 0
|
||||
LOGGER.warning(
|
||||
"Could not parse gps value for %s: %s", self.dev_id, gps)
|
||||
|
||||
# pylint: disable=not-an-iterable
|
||||
await self.async_update()
|
||||
|
||||
def stale(self, now: dt_util.dt.datetime = None):
|
||||
"""Return if device state is stale.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
return self.last_seen is None or \
|
||||
(now or dt_util.utcnow()) - self.last_seen > self.consider_home
|
||||
|
||||
def mark_stale(self):
|
||||
"""Mark the device state as stale."""
|
||||
self._state = STATE_NOT_HOME
|
||||
self.gps = None
|
||||
self.last_update_home = False
|
||||
|
||||
async def async_update(self):
|
||||
"""Update state of entity.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if not self.last_seen:
|
||||
return
|
||||
if self.location_name:
|
||||
self._state = self.location_name
|
||||
elif self.gps is not None and self.source_type == SOURCE_TYPE_GPS:
|
||||
zone_state = async_active_zone(
|
||||
self.hass, self.gps[0], self.gps[1], self.gps_accuracy)
|
||||
if zone_state is None:
|
||||
self._state = STATE_NOT_HOME
|
||||
elif zone_state.entity_id == zone.ENTITY_ID_HOME:
|
||||
self._state = STATE_HOME
|
||||
else:
|
||||
self._state = zone_state.name
|
||||
elif self.stale():
|
||||
self.mark_stale()
|
||||
else:
|
||||
self._state = STATE_HOME
|
||||
self.last_update_home = True
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""Add an entity."""
|
||||
await super().async_added_to_hass()
|
||||
state = await self.async_get_last_state()
|
||||
if not state:
|
||||
return
|
||||
self._state = state.state
|
||||
self.last_update_home = (state.state == STATE_HOME)
|
||||
self.last_seen = dt_util.utcnow()
|
||||
|
||||
for attr, var in (
|
||||
(ATTR_SOURCE_TYPE, 'source_type'),
|
||||
(ATTR_GPS_ACCURACY, 'gps_accuracy'),
|
||||
(ATTR_BATTERY, 'battery'),
|
||||
):
|
||||
if attr in state.attributes:
|
||||
setattr(self, var, state.attributes[attr])
|
||||
|
||||
if ATTR_LONGITUDE in state.attributes:
|
||||
self.gps = (state.attributes[ATTR_LATITUDE],
|
||||
state.attributes[ATTR_LONGITUDE])
|
||||
|
||||
|
||||
class DeviceScanner:
|
||||
"""Device scanner object."""
|
||||
|
||||
hass = None # type: HomeAssistantType
|
||||
|
||||
def scan_devices(self) -> List[str]:
|
||||
"""Scan for devices."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def async_scan_devices(self) -> Any:
|
||||
"""Scan for devices.
|
||||
|
||||
This method must be run in the event loop and returns a coroutine.
|
||||
"""
|
||||
return self.hass.async_add_job(self.scan_devices)
|
||||
|
||||
def get_device_name(self, device: str) -> str:
|
||||
"""Get the name of a device."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def async_get_device_name(self, device: str) -> Any:
|
||||
"""Get the name of a device.
|
||||
|
||||
This method must be run in the event loop and returns a coroutine.
|
||||
"""
|
||||
return self.hass.async_add_job(self.get_device_name, device)
|
||||
|
||||
def get_extra_attributes(self, device: str) -> dict:
|
||||
"""Get the extra attributes of a device."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def async_get_extra_attributes(self, device: str) -> Any:
|
||||
"""Get the extra attributes of a device.
|
||||
|
||||
This method must be run in the event loop and returns a coroutine.
|
||||
"""
|
||||
return self.hass.async_add_job(self.get_extra_attributes, device)
|
||||
|
||||
|
||||
async def async_load_config(path: str, hass: HomeAssistantType,
|
||||
consider_home: timedelta):
|
||||
"""Load devices from YAML configuration file.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
dev_schema = vol.Schema({
|
||||
vol.Required(CONF_NAME): cv.string,
|
||||
vol.Optional(CONF_ICON, default=None): vol.Any(None, cv.icon),
|
||||
vol.Optional('track', default=False): cv.boolean,
|
||||
vol.Optional(CONF_MAC, default=None):
|
||||
vol.Any(None, vol.All(cv.string, vol.Upper)),
|
||||
vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean,
|
||||
vol.Optional('gravatar', default=None): vol.Any(None, cv.string),
|
||||
vol.Optional('picture', default=None): vol.Any(None, cv.string),
|
||||
vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All(
|
||||
cv.time_period, cv.positive_timedelta),
|
||||
})
|
||||
try:
|
||||
result = []
|
||||
try:
|
||||
devices = await hass.async_add_job(
|
||||
load_yaml_config_file, path)
|
||||
except HomeAssistantError as err:
|
||||
LOGGER.error("Unable to load %s: %s", path, str(err))
|
||||
return []
|
||||
|
||||
for dev_id, device in devices.items():
|
||||
# Deprecated option. We just ignore it to avoid breaking change
|
||||
device.pop('vendor', None)
|
||||
try:
|
||||
device = dev_schema(device)
|
||||
device['dev_id'] = cv.slugify(dev_id)
|
||||
except vol.Invalid as exp:
|
||||
async_log_exception(exp, dev_id, devices, hass)
|
||||
else:
|
||||
result.append(Device(hass, **device))
|
||||
return result
|
||||
except (HomeAssistantError, FileNotFoundError):
|
||||
# When YAML file could not be loaded/did not contain a dict
|
||||
return []
|
||||
|
||||
|
||||
def update_config(path: str, dev_id: str, device: Device):
|
||||
"""Add device to YAML configuration file."""
|
||||
with open(path, 'a') as out:
|
||||
device = {device.dev_id: {
|
||||
ATTR_NAME: device.name,
|
||||
ATTR_MAC: device.mac,
|
||||
ATTR_ICON: device.icon,
|
||||
'picture': device.config_picture,
|
||||
'track': device.track,
|
||||
CONF_AWAY_HIDE: device.away_hide,
|
||||
}}
|
||||
out.write('\n')
|
||||
out.write(dump(device))
|
||||
|
||||
|
||||
def get_gravatar_for_email(email: str):
|
||||
"""Return an 80px Gravatar for the given email address.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
import hashlib
|
||||
url = 'https://www.gravatar.com/avatar/{}.jpg?s=80&d=wavatar'
|
||||
return url.format(hashlib.md5(email.encode('utf-8').lower()).hexdigest())
|
||||
Reference in New Issue
Block a user