mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 04:50:05 +00:00
Fix tests (#7659)
* Remove global hass * Http.auth test no longer spin up server * Remove server usage from http.ban test * Remove setupModule from test device_sun_light_trigger * Update common.py
This commit is contained in:
@@ -1,394 +1,380 @@
|
||||
"""The tests for the Home Assistant API component."""
|
||||
# pylint: disable=protected-access
|
||||
from contextlib import closing
|
||||
import asyncio
|
||||
import json
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
import pytest
|
||||
|
||||
from homeassistant import setup, const
|
||||
from homeassistant import const
|
||||
import homeassistant.core as ha
|
||||
import homeassistant.components.http as http
|
||||
|
||||
from tests.common import get_test_instance_port, get_test_home_assistant
|
||||
|
||||
API_PASSWORD = "test1234"
|
||||
SERVER_PORT = get_test_instance_port()
|
||||
HTTP_BASE_URL = "http://127.0.0.1:{}".format(SERVER_PORT)
|
||||
HA_HEADERS = {
|
||||
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
|
||||
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
|
||||
}
|
||||
|
||||
hass = None
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
def _url(path=""):
|
||||
"""Helper method to generate URLs."""
|
||||
return HTTP_BASE_URL + path
|
||||
@pytest.fixture
|
||||
def mock_api_client(hass, test_client):
|
||||
"""Start the Hass HTTP component."""
|
||||
hass.loop.run_until_complete(async_setup_component(hass, 'api', {}))
|
||||
return hass.loop.run_until_complete(test_client(hass.http.app))
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_list_state_entities(hass, mock_api_client):
|
||||
"""Test if the debug interface allows us to list state entities."""
|
||||
hass.states.async_set('test.entity', 'hello')
|
||||
resp = yield from mock_api_client.get(const.URL_API_STATES)
|
||||
assert resp.status == 200
|
||||
json = yield from resp.json()
|
||||
|
||||
remote_data = [ha.State.from_dict(item) for item in json]
|
||||
assert remote_data == hass.states.async_all()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_state(hass, mock_api_client):
|
||||
"""Test if the debug interface allows us to get a state."""
|
||||
hass.states.async_set('hello.world', 'nice', {
|
||||
'attr': 1,
|
||||
})
|
||||
resp = yield from mock_api_client.get(
|
||||
const.URL_API_STATES_ENTITY.format("hello.world"))
|
||||
assert resp.status == 200
|
||||
json = yield from resp.json()
|
||||
|
||||
data = ha.State.from_dict(json)
|
||||
|
||||
state = hass.states.get("hello.world")
|
||||
|
||||
assert data.state == state.state
|
||||
assert data.last_changed == state.last_changed
|
||||
assert data.attributes == state.attributes
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_non_existing_state(hass, mock_api_client):
|
||||
"""Test if the debug interface allows us to get a state."""
|
||||
resp = yield from mock_api_client.get(
|
||||
const.URL_API_STATES_ENTITY.format("does_not_exist"))
|
||||
assert resp.status == 404
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_state_change(hass, mock_api_client):
|
||||
"""Test if we can change the state of an entity that exists."""
|
||||
hass.states.async_set("test.test", "not_to_be_set")
|
||||
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_STATES_ENTITY.format("test.test"),
|
||||
json={"state": "debug_state_change2"})
|
||||
|
||||
assert hass.states.get("test.test").state == "debug_state_change2"
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def setUpModule():
|
||||
"""Initialize a Home Assistant server."""
|
||||
global hass
|
||||
@asyncio.coroutine
|
||||
def test_api_state_change_of_non_existing_entity(hass, mock_api_client):
|
||||
"""Test if changing a state of a non existing entity is possible."""
|
||||
new_state = "debug_state_change"
|
||||
|
||||
hass = get_test_home_assistant()
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_STATES_ENTITY.format("test_entity.that_does_not_exist"),
|
||||
json={'state': new_state})
|
||||
|
||||
hass.bus.listen('test_event', lambda _: _)
|
||||
hass.states.set('test.test', 'a_state')
|
||||
assert resp.status == 201
|
||||
|
||||
setup.setup_component(
|
||||
hass, http.DOMAIN,
|
||||
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
|
||||
http.CONF_SERVER_PORT: SERVER_PORT}})
|
||||
|
||||
setup.setup_component(hass, 'api')
|
||||
|
||||
hass.start()
|
||||
assert hass.states.get("test_entity.that_does_not_exist").state == \
|
||||
new_state
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def tearDownModule():
|
||||
"""Stop the Home Assistant server."""
|
||||
hass.stop()
|
||||
@asyncio.coroutine
|
||||
def test_api_state_change_with_bad_data(hass, mock_api_client):
|
||||
"""Test if API sends appropriate error if we omit state."""
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_STATES_ENTITY.format("test_entity.that_does_not_exist"),
|
||||
json={})
|
||||
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
class TestAPI(unittest.TestCase):
|
||||
"""Test the API."""
|
||||
# pylint: disable=invalid-name
|
||||
@asyncio.coroutine
|
||||
def test_api_state_change_push(hass, mock_api_client):
|
||||
"""Test if we can push a change the state of an entity."""
|
||||
hass.states.async_set("test.test", "not_to_be_set")
|
||||
|
||||
def tearDown(self):
|
||||
"""Stop everything that was started."""
|
||||
hass.block_till_done()
|
||||
events = []
|
||||
|
||||
def test_api_list_state_entities(self):
|
||||
"""Test if the debug interface allows us to list state entities."""
|
||||
req = requests.get(_url(const.URL_API_STATES),
|
||||
headers=HA_HEADERS)
|
||||
@ha.callback
|
||||
def event_listener(event):
|
||||
"""Track events."""
|
||||
events.append(event)
|
||||
|
||||
remote_data = [ha.State.from_dict(item) for item in req.json()]
|
||||
hass.bus.async_listen(const.EVENT_STATE_CHANGED, event_listener)
|
||||
|
||||
self.assertEqual(hass.states.all(), remote_data)
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_STATES_ENTITY.format("test.test"),
|
||||
json={"state": "not_to_be_set"})
|
||||
yield from hass.async_block_till_done()
|
||||
assert len(events) == 0
|
||||
|
||||
def test_api_get_state(self):
|
||||
"""Test if the debug interface allows us to get a state."""
|
||||
req = requests.get(
|
||||
_url(const.URL_API_STATES_ENTITY.format("test.test")),
|
||||
headers=HA_HEADERS)
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_STATES_ENTITY.format("test.test"),
|
||||
json={"state": "not_to_be_set", "force_update": True})
|
||||
yield from hass.async_block_till_done()
|
||||
assert len(events) == 1
|
||||
|
||||
data = ha.State.from_dict(req.json())
|
||||
|
||||
state = hass.states.get("test.test")
|
||||
# pylint: disable=invalid-name
|
||||
@asyncio.coroutine
|
||||
def test_api_fire_event_with_no_data(hass, mock_api_client):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
|
||||
self.assertEqual(state.state, data.state)
|
||||
self.assertEqual(state.last_changed, data.last_changed)
|
||||
self.assertEqual(state.attributes, data.attributes)
|
||||
@ha.callback
|
||||
def listener(event):
|
||||
"""Helper method that will verify our event got called."""
|
||||
test_value.append(1)
|
||||
|
||||
def test_api_get_non_existing_state(self):
|
||||
"""Test if the debug interface allows us to get a state."""
|
||||
req = requests.get(
|
||||
_url(const.URL_API_STATES_ENTITY.format("does_not_exist")),
|
||||
headers=HA_HEADERS)
|
||||
hass.bus.async_listen_once("test.event_no_data", listener)
|
||||
|
||||
self.assertEqual(404, req.status_code)
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_EVENTS_EVENT.format("test.event_no_data"))
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
def test_api_state_change(self):
|
||||
"""Test if we can change the state of an entity that exists."""
|
||||
hass.states.set("test.test", "not_to_be_set")
|
||||
assert len(test_value) == 1
|
||||
|
||||
requests.post(_url(const.URL_API_STATES_ENTITY.format("test.test")),
|
||||
data=json.dumps({"state": "debug_state_change2"}),
|
||||
headers=HA_HEADERS)
|
||||
|
||||
self.assertEqual("debug_state_change2",
|
||||
hass.states.get("test.test").state)
|
||||
# pylint: disable=invalid-name
|
||||
@asyncio.coroutine
|
||||
def test_api_fire_event_with_data(hass, mock_api_client):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_state_change_of_non_existing_entity(self):
|
||||
"""Test if changing a state of a non existing entity is possible."""
|
||||
new_state = "debug_state_change"
|
||||
@ha.callback
|
||||
def listener(event):
|
||||
"""Helper method that will verify that our event got called.
|
||||
|
||||
req = requests.post(
|
||||
_url(const.URL_API_STATES_ENTITY.format(
|
||||
"test_entity.that_does_not_exist")),
|
||||
data=json.dumps({'state': new_state}),
|
||||
headers=HA_HEADERS)
|
||||
|
||||
cur_state = (hass.states.
|
||||
get("test_entity.that_does_not_exist").state)
|
||||
|
||||
self.assertEqual(201, req.status_code)
|
||||
self.assertEqual(cur_state, new_state)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_state_change_with_bad_data(self):
|
||||
"""Test if API sends appropriate error if we omit state."""
|
||||
req = requests.post(
|
||||
_url(const.URL_API_STATES_ENTITY.format(
|
||||
"test_entity.that_does_not_exist")),
|
||||
data=json.dumps({}),
|
||||
headers=HA_HEADERS)
|
||||
|
||||
self.assertEqual(400, req.status_code)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_state_change_push(self):
|
||||
"""Test if we can push a change the state of an entity."""
|
||||
hass.states.set("test.test", "not_to_be_set")
|
||||
|
||||
events = []
|
||||
hass.bus.listen(const.EVENT_STATE_CHANGED,
|
||||
lambda ev: events.append(ev))
|
||||
|
||||
requests.post(_url(const.URL_API_STATES_ENTITY.format("test.test")),
|
||||
data=json.dumps({"state": "not_to_be_set"}),
|
||||
headers=HA_HEADERS)
|
||||
hass.block_till_done()
|
||||
self.assertEqual(0, len(events))
|
||||
|
||||
requests.post(_url(const.URL_API_STATES_ENTITY.format("test.test")),
|
||||
data=json.dumps({"state": "not_to_be_set",
|
||||
"force_update": True}),
|
||||
headers=HA_HEADERS)
|
||||
hass.block_till_done()
|
||||
self.assertEqual(1, len(events))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_fire_event_with_no_data(self):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
|
||||
def listener(event):
|
||||
"""Helper method that will verify our event got called."""
|
||||
Also test if our data came through.
|
||||
"""
|
||||
if "test" in event.data:
|
||||
test_value.append(1)
|
||||
|
||||
hass.bus.listen_once("test.event_no_data", listener)
|
||||
hass.bus.async_listen_once("test_event_with_data", listener)
|
||||
|
||||
requests.post(
|
||||
_url(const.URL_API_EVENTS_EVENT.format("test.event_no_data")),
|
||||
headers=HA_HEADERS)
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_EVENTS_EVENT.format("test_event_with_data"),
|
||||
json={"test": 1})
|
||||
|
||||
hass.block_till_done()
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
self.assertEqual(1, len(test_value))
|
||||
assert len(test_value) == 1
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_fire_event_with_data(self):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
|
||||
def listener(event):
|
||||
"""Helper method that will verify that our event got called.
|
||||
# pylint: disable=invalid-name
|
||||
@asyncio.coroutine
|
||||
def test_api_fire_event_with_invalid_json(hass, mock_api_client):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
|
||||
Also test if our data came through.
|
||||
"""
|
||||
if "test" in event.data:
|
||||
test_value.append(1)
|
||||
@ha.callback
|
||||
def listener(event):
|
||||
"""Helper method that will verify our event got called."""
|
||||
test_value.append(1)
|
||||
|
||||
hass.bus.listen_once("test_event_with_data", listener)
|
||||
hass.bus.async_listen_once("test_event_bad_data", listener)
|
||||
|
||||
requests.post(
|
||||
_url(const.URL_API_EVENTS_EVENT.format("test_event_with_data")),
|
||||
data=json.dumps({"test": 1}),
|
||||
headers=HA_HEADERS)
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_EVENTS_EVENT.format("test_event_bad_data"),
|
||||
data=json.dumps('not an object'))
|
||||
|
||||
hass.block_till_done()
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
self.assertEqual(1, len(test_value))
|
||||
assert resp.status == 400
|
||||
assert len(test_value) == 0
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_api_fire_event_with_invalid_json(self):
|
||||
"""Test if the API allows us to fire an event."""
|
||||
test_value = []
|
||||
# Try now with valid but unusable JSON
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_EVENTS_EVENT.format("test_event_bad_data"),
|
||||
data=json.dumps([1, 2, 3]))
|
||||
|
||||
def listener(event):
|
||||
"""Helper method that will verify our event got called."""
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
assert resp.status == 400
|
||||
assert len(test_value) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_config(hass, mock_api_client):
|
||||
"""Test the return of the configuration."""
|
||||
resp = yield from mock_api_client.get(const.URL_API_CONFIG)
|
||||
result = yield from resp.json()
|
||||
if 'components' in result:
|
||||
result['components'] = set(result['components'])
|
||||
|
||||
assert hass.config.as_dict() == result
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_components(hass, mock_api_client):
|
||||
"""Test the return of the components."""
|
||||
resp = yield from mock_api_client.get(const.URL_API_COMPONENTS)
|
||||
result = yield from resp.json()
|
||||
assert set(result) == hass.config.components
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_event_listeners(hass, mock_api_client):
|
||||
"""Test if we can get the list of events being listened for."""
|
||||
resp = yield from mock_api_client.get(const.URL_API_EVENTS)
|
||||
data = yield from resp.json()
|
||||
|
||||
local = hass.bus.async_listeners()
|
||||
|
||||
for event in data:
|
||||
assert local.pop(event["event"]) == event["listener_count"]
|
||||
|
||||
assert len(local) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_get_services(hass, mock_api_client):
|
||||
"""Test if we can get a dict describing current services."""
|
||||
resp = yield from mock_api_client.get(const.URL_API_SERVICES)
|
||||
data = yield from resp.json()
|
||||
local_services = hass.services.async_services()
|
||||
|
||||
for serv_domain in data:
|
||||
local = local_services.pop(serv_domain["domain"])
|
||||
|
||||
assert serv_domain["services"] == local
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_call_service_no_data(hass, mock_api_client):
|
||||
"""Test if the API allows us to call a service."""
|
||||
test_value = []
|
||||
|
||||
@ha.callback
|
||||
def listener(service_call):
|
||||
"""Helper method that will verify that our service got called."""
|
||||
test_value.append(1)
|
||||
|
||||
hass.services.async_register("test_domain", "test_service", listener)
|
||||
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_SERVICES_SERVICE.format(
|
||||
"test_domain", "test_service"))
|
||||
yield from hass.async_block_till_done()
|
||||
assert len(test_value) == 1
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_api_call_service_with_data(hass, mock_api_client):
|
||||
"""Test if the API allows us to call a service."""
|
||||
test_value = []
|
||||
|
||||
@ha.callback
|
||||
def listener(service_call):
|
||||
"""Helper method that will verify that our service got called.
|
||||
|
||||
Also test if our data came through.
|
||||
"""
|
||||
if "test" in service_call.data:
|
||||
test_value.append(1)
|
||||
|
||||
hass.bus.listen_once("test_event_bad_data", listener)
|
||||
hass.services.async_register("test_domain", "test_service", listener)
|
||||
|
||||
req = requests.post(
|
||||
_url(const.URL_API_EVENTS_EVENT.format("test_event_bad_data")),
|
||||
data=json.dumps('not an object'),
|
||||
headers=HA_HEADERS)
|
||||
yield from mock_api_client.post(
|
||||
const.URL_API_SERVICES_SERVICE.format("test_domain", "test_service"),
|
||||
json={"test": 1})
|
||||
|
||||
hass.block_till_done()
|
||||
yield from hass.async_block_till_done()
|
||||
assert len(test_value) == 1
|
||||
|
||||
self.assertEqual(400, req.status_code)
|
||||
self.assertEqual(0, len(test_value))
|
||||
|
||||
# Try now with valid but unusable JSON
|
||||
req = requests.post(
|
||||
_url(const.URL_API_EVENTS_EVENT.format("test_event_bad_data")),
|
||||
data=json.dumps([1, 2, 3]),
|
||||
headers=HA_HEADERS)
|
||||
@asyncio.coroutine
|
||||
def test_api_template(hass, mock_api_client):
|
||||
"""Test the template API."""
|
||||
hass.states.async_set('sensor.temperature', 10)
|
||||
|
||||
hass.block_till_done()
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_TEMPLATE,
|
||||
json={"template": '{{ states.sensor.temperature.state }}'})
|
||||
|
||||
self.assertEqual(400, req.status_code)
|
||||
self.assertEqual(0, len(test_value))
|
||||
body = yield from resp.text()
|
||||
|
||||
def test_api_get_config(self):
|
||||
"""Test the return of the configuration."""
|
||||
req = requests.get(_url(const.URL_API_CONFIG),
|
||||
headers=HA_HEADERS)
|
||||
result = req.json()
|
||||
if 'components' in result:
|
||||
result['components'] = set(result['components'])
|
||||
assert body == '10'
|
||||
|
||||
self.assertEqual(hass.config.as_dict(), result)
|
||||
|
||||
def test_api_get_components(self):
|
||||
"""Test the return of the components."""
|
||||
req = requests.get(_url(const.URL_API_COMPONENTS),
|
||||
headers=HA_HEADERS)
|
||||
self.assertEqual(hass.config.components, set(req.json()))
|
||||
@asyncio.coroutine
|
||||
def test_api_template_error(hass, mock_api_client):
|
||||
"""Test the template API."""
|
||||
hass.states.async_set('sensor.temperature', 10)
|
||||
|
||||
def test_api_get_event_listeners(self):
|
||||
"""Test if we can get the list of events being listened for."""
|
||||
req = requests.get(_url(const.URL_API_EVENTS),
|
||||
headers=HA_HEADERS)
|
||||
resp = yield from mock_api_client.post(
|
||||
const.URL_API_TEMPLATE,
|
||||
json={"template": '{{ states.sensor.temperature.state'})
|
||||
|
||||
local = hass.bus.listeners
|
||||
assert resp.status == 400
|
||||
|
||||
for event in req.json():
|
||||
self.assertEqual(event["listener_count"],
|
||||
local.pop(event["event"]))
|
||||
|
||||
self.assertEqual(0, len(local))
|
||||
@asyncio.coroutine
|
||||
def test_stream(hass, mock_api_client):
|
||||
"""Test the stream."""
|
||||
listen_count = _listen_count(hass)
|
||||
|
||||
def test_api_get_services(self):
|
||||
"""Test if we can get a dict describing current services."""
|
||||
req = requests.get(_url(const.URL_API_SERVICES),
|
||||
headers=HA_HEADERS)
|
||||
resp = yield from mock_api_client.get(const.URL_API_STREAM)
|
||||
assert resp.status == 200
|
||||
assert listen_count + 1 == _listen_count(hass)
|
||||
|
||||
local_services = hass.services.services
|
||||
hass.bus.async_fire('test_event')
|
||||
|
||||
for serv_domain in req.json():
|
||||
local = local_services.pop(serv_domain["domain"])
|
||||
data = yield from _stream_next_event(resp.content)
|
||||
|
||||
self.assertEqual(local, serv_domain["services"])
|
||||
assert data['event_type'] == 'test_event'
|
||||
|
||||
def test_api_call_service_no_data(self):
|
||||
"""Test if the API allows us to call a service."""
|
||||
test_value = []
|
||||
|
||||
@ha.callback
|
||||
def listener(service_call):
|
||||
"""Helper method that will verify that our service got called."""
|
||||
test_value.append(1)
|
||||
@asyncio.coroutine
|
||||
def test_stream_with_restricted(hass, mock_api_client):
|
||||
"""Test the stream with restrictions."""
|
||||
listen_count = _listen_count(hass)
|
||||
|
||||
hass.services.register("test_domain", "test_service", listener)
|
||||
resp = yield from mock_api_client.get(
|
||||
'{}?restrict=test_event1,test_event3'.format(const.URL_API_STREAM))
|
||||
assert resp.status == 200
|
||||
assert listen_count + 1 == _listen_count(hass)
|
||||
|
||||
requests.post(
|
||||
_url(const.URL_API_SERVICES_SERVICE.format(
|
||||
"test_domain", "test_service")),
|
||||
headers=HA_HEADERS)
|
||||
hass.bus.async_fire('test_event1')
|
||||
data = yield from _stream_next_event(resp.content)
|
||||
assert data['event_type'] == 'test_event1'
|
||||
|
||||
hass.block_till_done()
|
||||
hass.bus.async_fire('test_event2')
|
||||
hass.bus.async_fire('test_event3')
|
||||
data = yield from _stream_next_event(resp.content)
|
||||
assert data['event_type'] == 'test_event3'
|
||||
|
||||
self.assertEqual(1, len(test_value))
|
||||
|
||||
def test_api_call_service_with_data(self):
|
||||
"""Test if the API allows us to call a service."""
|
||||
test_value = []
|
||||
@asyncio.coroutine
|
||||
def _stream_next_event(stream):
|
||||
"""Read the stream for next event while ignoring ping."""
|
||||
while True:
|
||||
last_new_line = False
|
||||
data = b''
|
||||
|
||||
@ha.callback
|
||||
def listener(service_call):
|
||||
"""Helper method that will verify that our service got called.
|
||||
|
||||
Also test if our data came through.
|
||||
"""
|
||||
if "test" in service_call.data:
|
||||
test_value.append(1)
|
||||
|
||||
hass.services.register("test_domain", "test_service", listener)
|
||||
|
||||
requests.post(
|
||||
_url(const.URL_API_SERVICES_SERVICE.format(
|
||||
"test_domain", "test_service")),
|
||||
data=json.dumps({"test": 1}),
|
||||
headers=HA_HEADERS)
|
||||
|
||||
hass.block_till_done()
|
||||
|
||||
self.assertEqual(1, len(test_value))
|
||||
|
||||
def test_api_template(self):
|
||||
"""Test the template API."""
|
||||
hass.states.set('sensor.temperature', 10)
|
||||
|
||||
req = requests.post(
|
||||
_url(const.URL_API_TEMPLATE),
|
||||
json={"template": '{{ states.sensor.temperature.state }}'},
|
||||
headers=HA_HEADERS)
|
||||
|
||||
self.assertEqual('10', req.text)
|
||||
|
||||
def test_api_template_error(self):
|
||||
"""Test the template API."""
|
||||
hass.states.set('sensor.temperature', 10)
|
||||
|
||||
req = requests.post(
|
||||
_url(const.URL_API_TEMPLATE),
|
||||
data=json.dumps({"template":
|
||||
'{{ states.sensor.temperature.state'}),
|
||||
headers=HA_HEADERS)
|
||||
|
||||
self.assertEqual(400, req.status_code)
|
||||
|
||||
def test_stream(self):
|
||||
"""Test the stream."""
|
||||
listen_count = self._listen_count()
|
||||
with closing(requests.get(_url(const.URL_API_STREAM), timeout=3,
|
||||
stream=True, headers=HA_HEADERS)) as req:
|
||||
stream = req.iter_content(1)
|
||||
self.assertEqual(listen_count + 1, self._listen_count())
|
||||
|
||||
hass.bus.fire('test_event')
|
||||
|
||||
data = self._stream_next_event(stream)
|
||||
|
||||
self.assertEqual('test_event', data['event_type'])
|
||||
|
||||
def test_stream_with_restricted(self):
|
||||
"""Test the stream with restrictions."""
|
||||
listen_count = self._listen_count()
|
||||
url = _url('{}?restrict=test_event1,test_event3'.format(
|
||||
const.URL_API_STREAM))
|
||||
with closing(requests.get(url, stream=True, timeout=3,
|
||||
headers=HA_HEADERS)) as req:
|
||||
stream = req.iter_content(1)
|
||||
self.assertEqual(listen_count + 1, self._listen_count())
|
||||
|
||||
hass.bus.fire('test_event1')
|
||||
data = self._stream_next_event(stream)
|
||||
self.assertEqual('test_event1', data['event_type'])
|
||||
|
||||
hass.bus.fire('test_event2')
|
||||
hass.bus.fire('test_event3')
|
||||
|
||||
data = self._stream_next_event(stream)
|
||||
self.assertEqual('test_event3', data['event_type'])
|
||||
|
||||
def _stream_next_event(self, stream):
|
||||
"""Read the stream for next event while ignoring ping."""
|
||||
while True:
|
||||
data = b''
|
||||
last_new_line = False
|
||||
for dat in stream:
|
||||
if dat == b'\n' and last_new_line:
|
||||
break
|
||||
data += dat
|
||||
last_new_line = dat == b'\n'
|
||||
|
||||
conv = data.decode('utf-8').strip()[6:]
|
||||
|
||||
if conv != 'ping':
|
||||
dat = yield from stream.read(1)
|
||||
if dat == b'\n' and last_new_line:
|
||||
break
|
||||
data += dat
|
||||
last_new_line = dat == b'\n'
|
||||
|
||||
return json.loads(conv)
|
||||
conv = data.decode('utf-8').strip()[6:]
|
||||
|
||||
def _listen_count(self):
|
||||
"""Return number of event listeners."""
|
||||
return sum(hass.bus.listeners.values())
|
||||
if conv != 'ping':
|
||||
break
|
||||
return json.loads(conv)
|
||||
|
||||
|
||||
def _listen_count(hass):
|
||||
"""Return number of event listeners."""
|
||||
return sum(hass.bus.async_listeners().values())
|
||||
|
||||
Reference in New Issue
Block a user