1
0
mirror of https://github.com/home-assistant/core.git synced 2025-12-24 12:59:34 +00:00

Collection of core tests improvements (#33757)

* Collection of core tests improvements

* Added some more

* Fix aiohttp client response release
This commit is contained in:
Franck Nijhof
2020-04-07 18:33:23 +02:00
committed by GitHub
parent 1f7803c541
commit 60bc517d01
12 changed files with 302 additions and 343 deletions

View File

@@ -1,143 +1,131 @@
"""Test dispatcher helpers."""
import asyncio
from functools import partial
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
dispatcher_connect,
dispatcher_send,
async_dispatcher_send,
)
from tests.common import get_test_home_assistant
async def test_simple_function(hass):
"""Test simple function (executor)."""
calls = []
def test_funct(data):
"""Test function."""
calls.append(data)
async_dispatcher_connect(hass, "test", test_funct)
async_dispatcher_send(hass, "test", 3)
await hass.async_block_till_done()
assert calls == [3]
async_dispatcher_send(hass, "test", "bla")
await hass.async_block_till_done()
assert calls == [3, "bla"]
class TestHelpersDispatcher:
"""Tests for discovery helper methods."""
async def test_simple_function_unsub(hass):
"""Test simple function (executor) and unsub."""
calls1 = []
calls2 = []
def setup_method(self, method):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
def test_funct1(data):
"""Test function."""
calls1.append(data)
def teardown_method(self, method):
"""Stop everything that was started."""
self.hass.stop()
def test_funct2(data):
"""Test function."""
calls2.append(data)
def test_simple_function(self):
"""Test simple function (executor)."""
calls = []
async_dispatcher_connect(hass, "test1", test_funct1)
unsub = async_dispatcher_connect(hass, "test2", test_funct2)
async_dispatcher_send(hass, "test1", 3)
async_dispatcher_send(hass, "test2", 4)
await hass.async_block_till_done()
def test_funct(data):
"""Test function."""
calls.append(data)
assert calls1 == [3]
assert calls2 == [4]
dispatcher_connect(self.hass, "test", test_funct)
dispatcher_send(self.hass, "test", 3)
self.hass.block_till_done()
unsub()
assert calls == [3]
async_dispatcher_send(hass, "test1", 5)
async_dispatcher_send(hass, "test2", 6)
await hass.async_block_till_done()
dispatcher_send(self.hass, "test", "bla")
self.hass.block_till_done()
assert calls1 == [3, 5]
assert calls2 == [4]
assert calls == [3, "bla"]
# check don't kill the flow
unsub()
def test_simple_function_unsub(self):
"""Test simple function (executor) and unsub."""
calls1 = []
calls2 = []
async_dispatcher_send(hass, "test1", 7)
async_dispatcher_send(hass, "test2", 8)
await hass.async_block_till_done()
def test_funct1(data):
"""Test function."""
calls1.append(data)
assert calls1 == [3, 5, 7]
assert calls2 == [4]
def test_funct2(data):
"""Test function."""
calls2.append(data)
dispatcher_connect(self.hass, "test1", test_funct1)
unsub = dispatcher_connect(self.hass, "test2", test_funct2)
dispatcher_send(self.hass, "test1", 3)
dispatcher_send(self.hass, "test2", 4)
self.hass.block_till_done()
async def test_simple_callback(hass):
"""Test simple callback (async)."""
calls = []
assert calls1 == [3]
assert calls2 == [4]
@callback
def test_funct(data):
"""Test function."""
calls.append(data)
unsub()
async_dispatcher_connect(hass, "test", test_funct)
async_dispatcher_send(hass, "test", 3)
await hass.async_block_till_done()
dispatcher_send(self.hass, "test1", 5)
dispatcher_send(self.hass, "test2", 6)
self.hass.block_till_done()
assert calls == [3]
assert calls1 == [3, 5]
assert calls2 == [4]
async_dispatcher_send(hass, "test", "bla")
await hass.async_block_till_done()
# check don't kill the flow
unsub()
assert calls == [3, "bla"]
dispatcher_send(self.hass, "test1", 7)
dispatcher_send(self.hass, "test2", 8)
self.hass.block_till_done()
assert calls1 == [3, 5, 7]
assert calls2 == [4]
async def test_simple_coro(hass):
"""Test simple coro (async)."""
calls = []
def test_simple_callback(self):
"""Test simple callback (async)."""
calls = []
async def async_test_funct(data):
"""Test function."""
calls.append(data)
@callback
def test_funct(data):
"""Test function."""
calls.append(data)
async_dispatcher_connect(hass, "test", async_test_funct)
async_dispatcher_send(hass, "test", 3)
await hass.async_block_till_done()
dispatcher_connect(self.hass, "test", test_funct)
dispatcher_send(self.hass, "test", 3)
self.hass.block_till_done()
assert calls == [3]
assert calls == [3]
async_dispatcher_send(hass, "test", "bla")
await hass.async_block_till_done()
dispatcher_send(self.hass, "test", "bla")
self.hass.block_till_done()
assert calls == [3, "bla"]
assert calls == [3, "bla"]
def test_simple_coro(self):
"""Test simple coro (async)."""
calls = []
async def test_simple_function_multiargs(hass):
"""Test simple function (executor)."""
calls = []
@asyncio.coroutine
def test_funct(data):
"""Test function."""
calls.append(data)
def test_funct(data1, data2, data3):
"""Test function."""
calls.append(data1)
calls.append(data2)
calls.append(data3)
dispatcher_connect(self.hass, "test", test_funct)
dispatcher_send(self.hass, "test", 3)
self.hass.block_till_done()
async_dispatcher_connect(hass, "test", test_funct)
async_dispatcher_send(hass, "test", 3, 2, "bla")
await hass.async_block_till_done()
assert calls == [3]
dispatcher_send(self.hass, "test", "bla")
self.hass.block_till_done()
assert calls == [3, "bla"]
def test_simple_function_multiargs(self):
"""Test simple function (executor)."""
calls = []
def test_funct(data1, data2, data3):
"""Test function."""
calls.append(data1)
calls.append(data2)
calls.append(data3)
dispatcher_connect(self.hass, "test", test_funct)
dispatcher_send(self.hass, "test", 3, 2, "bla")
self.hass.block_till_done()
assert calls == [3, 2, "bla"]
assert calls == [3, 2, "bla"]
async def test_callback_exception_gets_logged(hass, caplog):
@@ -150,7 +138,7 @@ async def test_callback_exception_gets_logged(hass, caplog):
# wrap in partial to test message logging.
async_dispatcher_connect(hass, "test", partial(bad_handler))
dispatcher_send(hass, "test", "bad")
async_dispatcher_send(hass, "test", "bad")
await hass.async_block_till_done()
await hass.async_block_till_done()