mirror of
https://github.com/home-assistant/core.git
synced 2025-12-24 12:59:34 +00:00
Fix MQTT retained message not being re-dispatched (#12004)
* Fix MQTT retained message not being re-dispatched * Fix tests * Use paho-mqtt for retained messages * Improve code style * Store list of subscribers * Fix lint error * Adhere to Home Assistant's logging standard "Try to avoid brackets and additional quotes around the output to make it easier for users to parse the log." - https://home-assistant.io/developers/development_guidelines/ * Add reconnect tests * Fix lint error * Introduce Subscription Tests still need to be updated * Use namedtuple for MQTT messages ... And fix issues Accessing the config manually at runtime isn't ideal * Fix MQTT __init__.py tests * Updated usage of Mocks * Moved tests that were testing subscriptions out of the MQTTComponent test, because of how mock.patch was used * Adjusted the remaining tests for the MQTT clients new behavior - e.g. self.progress was removed * Updated the async_fire_mqtt_message helper * ✅ Update MQTT tests * Re-introduce the MQTT subscriptions through the dispatcher for tests - quite ugly though... 🚧 * Update fixtures to use our new MQTT mock 🎨 * 📝 Update base code according to comments * 🔨 Adjust MQTT test base * 🔨 Update other MQTT tests * 🍎 Fix carriage return in source files Apparently test_mqtt_json.py and test_mqtt_template.py were written on Windows. In order to not mess up the diff, I'll just redo the carriage return. * 🎨 Remove unused import * 📝 Remove fire_mqtt_client_message * 🐛 Fix using python 3.6 method What's very interesting is that 3.4 didn't fail on travis... * 🐛 Fix using assert directly
This commit is contained in:
committed by
Paulus Schoutsen
parent
17e5740a0c
commit
b1c0cabe6c
@@ -1,6 +1,5 @@
|
||||
"""The tests for the MQTT component."""
|
||||
import asyncio
|
||||
from collections import namedtuple, OrderedDict
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import socket
|
||||
@@ -9,26 +8,27 @@ import ssl
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.setup import setup_component, async_setup_component
|
||||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.components.mqtt as mqtt
|
||||
from homeassistant.const import (
|
||||
EVENT_CALL_SERVICE, ATTR_DOMAIN, ATTR_SERVICE, EVENT_HOMEASSISTANT_STOP)
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.const import (EVENT_CALL_SERVICE, ATTR_DOMAIN, ATTR_SERVICE,
|
||||
EVENT_HOMEASSISTANT_STOP)
|
||||
|
||||
from tests.common import (
|
||||
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message, mock_coro)
|
||||
from tests.common import (get_test_home_assistant, mock_coro,
|
||||
mock_mqtt_component,
|
||||
threadsafe_coroutine_factory, fire_mqtt_message,
|
||||
async_fire_mqtt_message)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def mock_mqtt_client(hass, config=None):
|
||||
def async_mock_mqtt_client(hass, config=None):
|
||||
"""Mock the MQTT paho client."""
|
||||
if config is None:
|
||||
config = {
|
||||
mqtt.CONF_BROKER: 'mock-broker'
|
||||
}
|
||||
config = {mqtt.CONF_BROKER: 'mock-broker'}
|
||||
|
||||
with mock.patch('paho.mqtt.client.Client') as mock_client:
|
||||
mock_client().connect = lambda *args: 0
|
||||
mock_client().connect.return_value = 0
|
||||
mock_client().subscribe.return_value = (0, 0)
|
||||
mock_client().publish.return_value = (0, 0)
|
||||
result = yield from async_setup_component(hass, mqtt.DOMAIN, {
|
||||
mqtt.DOMAIN: config
|
||||
})
|
||||
@@ -36,8 +36,11 @@ def mock_mqtt_client(hass, config=None):
|
||||
return mock_client()
|
||||
|
||||
|
||||
mock_mqtt_client = threadsafe_coroutine_factory(async_mock_mqtt_client)
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
class TestMQTT(unittest.TestCase):
|
||||
class TestMQTTComponent(unittest.TestCase):
|
||||
"""Test the MQTT component."""
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
@@ -55,12 +58,8 @@ class TestMQTT(unittest.TestCase):
|
||||
"""Helper for recording calls."""
|
||||
self.calls.append(args)
|
||||
|
||||
def test_client_starts_on_home_assistant_mqtt_setup(self):
|
||||
"""Test if client is connect after mqtt init on bootstrap."""
|
||||
assert self.hass.data['mqtt'].async_connect.called
|
||||
|
||||
def test_client_stops_on_home_assistant_start(self):
|
||||
"""Test if client stops on HA launch."""
|
||||
"""Test if client stops on HA stop."""
|
||||
self.hass.bus.fire(EVENT_HOMEASSISTANT_STOP)
|
||||
self.hass.block_till_done()
|
||||
self.assertTrue(self.hass.data['mqtt'].async_disconnect.called)
|
||||
@@ -131,6 +130,48 @@ class TestMQTT(unittest.TestCase):
|
||||
self.hass.data['mqtt'].async_publish.call_args[0][2], 2)
|
||||
self.assertFalse(self.hass.data['mqtt'].async_publish.call_args[0][3])
|
||||
|
||||
def test_invalid_mqtt_topics(self):
|
||||
"""Test invalid topics."""
|
||||
self.assertRaises(vol.Invalid, mqtt.valid_publish_topic, 'bad+topic')
|
||||
self.assertRaises(vol.Invalid, mqtt.valid_subscribe_topic, 'bad\0one')
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
class TestMQTTCallbacks(unittest.TestCase):
|
||||
"""Test the MQTT callbacks."""
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
"""Setup things to be run when tests are started."""
|
||||
self.hass = get_test_home_assistant()
|
||||
mock_mqtt_client(self.hass)
|
||||
self.calls = []
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
"""Stop everything that was started."""
|
||||
self.hass.stop()
|
||||
|
||||
@callback
|
||||
def record_calls(self, *args):
|
||||
"""Helper for recording calls."""
|
||||
self.calls.append(args)
|
||||
|
||||
def test_client_starts_on_home_assistant_mqtt_setup(self):
|
||||
"""Test if client is connected after mqtt init on bootstrap."""
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.connect.call_count, 1)
|
||||
|
||||
def test_receiving_non_utf8_message_gets_logged(self):
|
||||
"""Test receiving a non utf8 encoded message."""
|
||||
mqtt.subscribe(self.hass, 'test-topic', self.record_calls)
|
||||
|
||||
with self.assertLogs(level='WARNING') as test_handle:
|
||||
fire_mqtt_message(self.hass, 'test-topic', b'\x9a')
|
||||
|
||||
self.hass.block_till_done()
|
||||
self.assertIn(
|
||||
"WARNING:homeassistant.components.mqtt:Can't decode payload "
|
||||
"b'\\x9a' on test-topic with encoding utf-8",
|
||||
test_handle.output[0])
|
||||
|
||||
def test_subscribe_topic(self):
|
||||
"""Test the subscription of a topic."""
|
||||
unsub = mqtt.subscribe(self.hass, 'test-topic', self.record_calls)
|
||||
@@ -296,82 +337,6 @@ class TestMQTT(unittest.TestCase):
|
||||
self.assertEqual(topic, self.calls[0][0])
|
||||
self.assertEqual(payload, self.calls[0][1])
|
||||
|
||||
def test_subscribe_binary_topic(self):
|
||||
"""Test the subscription to a binary topic."""
|
||||
mqtt.subscribe(self.hass, 'test-topic', self.record_calls,
|
||||
0, None)
|
||||
|
||||
fire_mqtt_message(self.hass, 'test-topic', 0x9a)
|
||||
|
||||
self.hass.block_till_done()
|
||||
self.assertEqual(1, len(self.calls))
|
||||
self.assertEqual('test-topic', self.calls[0][0])
|
||||
self.assertEqual(0x9a, self.calls[0][1])
|
||||
|
||||
def test_receiving_non_utf8_message_gets_logged(self):
|
||||
"""Test receiving a non utf8 encoded message."""
|
||||
mqtt.subscribe(self.hass, 'test-topic', self.record_calls)
|
||||
|
||||
with self.assertLogs(level='ERROR') as test_handle:
|
||||
fire_mqtt_message(self.hass, 'test-topic', 0x9a)
|
||||
self.hass.block_till_done()
|
||||
self.assertIn(
|
||||
"ERROR:homeassistant.components.mqtt:Illegal payload "
|
||||
"encoding utf-8 from MQTT "
|
||||
"topic: test-topic, Payload: 154",
|
||||
test_handle.output[0])
|
||||
|
||||
|
||||
class TestMQTTCallbacks(unittest.TestCase):
|
||||
"""Test the MQTT callbacks."""
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
"""Setup things to be run when tests are started."""
|
||||
self.hass = get_test_home_assistant()
|
||||
|
||||
with mock.patch('paho.mqtt.client.Client') as client:
|
||||
client().connect = lambda *args: 0
|
||||
assert setup_component(self.hass, mqtt.DOMAIN, {
|
||||
mqtt.DOMAIN: {
|
||||
mqtt.CONF_BROKER: 'mock-broker',
|
||||
}
|
||||
})
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
"""Stop everything that was started."""
|
||||
self.hass.stop()
|
||||
|
||||
def test_receiving_mqtt_message_fires_hass_event(self):
|
||||
"""Test if receiving triggers an event."""
|
||||
calls = []
|
||||
|
||||
@callback
|
||||
def record(topic, payload, qos):
|
||||
"""Helper to record calls."""
|
||||
data = {
|
||||
'topic': topic,
|
||||
'payload': payload,
|
||||
'qos': qos,
|
||||
}
|
||||
calls.append(data)
|
||||
|
||||
async_dispatcher_connect(
|
||||
self.hass, mqtt.SIGNAL_MQTT_MESSAGE_RECEIVED, record)
|
||||
|
||||
MQTTMessage = namedtuple('MQTTMessage', ['topic', 'qos', 'payload'])
|
||||
message = MQTTMessage('test_topic', 1, 'Hello World!'.encode('utf-8'))
|
||||
|
||||
self.hass.data['mqtt']._mqtt_on_message(
|
||||
None, {'hass': self.hass}, message)
|
||||
self.hass.block_till_done()
|
||||
|
||||
self.assertEqual(1, len(calls))
|
||||
last_event = calls[0]
|
||||
self.assertEqual(bytearray('Hello World!', 'utf-8'),
|
||||
last_event['payload'])
|
||||
self.assertEqual(message.topic, last_event['topic'])
|
||||
self.assertEqual(message.qos, last_event['qos'])
|
||||
|
||||
def test_mqtt_failed_connection_results_in_disconnect(self):
|
||||
"""Test if connection failure leads to disconnect."""
|
||||
for result_code in range(1, 6):
|
||||
@@ -388,16 +353,11 @@ class TestMQTTCallbacks(unittest.TestCase):
|
||||
@mock.patch('homeassistant.components.mqtt.time.sleep')
|
||||
def test_mqtt_disconnect_tries_reconnect(self, mock_sleep):
|
||||
"""Test the re-connect tries."""
|
||||
self.hass.data['mqtt'].subscribed_topics = {
|
||||
'test/topic': 1,
|
||||
}
|
||||
self.hass.data['mqtt'].wanted_topics = {
|
||||
'test/progress': 0,
|
||||
'test/topic': 2,
|
||||
}
|
||||
self.hass.data['mqtt'].progress = {
|
||||
1: 'test/progress'
|
||||
}
|
||||
self.hass.data['mqtt'].subscriptions = [
|
||||
mqtt.Subscription('test/progress', None, 0),
|
||||
mqtt.Subscription('test/progress', None, 1),
|
||||
mqtt.Subscription('test/topic', None, 2),
|
||||
]
|
||||
self.hass.data['mqtt']._mqttc.reconnect.side_effect = [1, 1, 1, 0]
|
||||
self.hass.data['mqtt']._mqtt_on_disconnect(None, None, 1)
|
||||
self.assertTrue(self.hass.data['mqtt']._mqttc.reconnect.called)
|
||||
@@ -406,15 +366,77 @@ class TestMQTTCallbacks(unittest.TestCase):
|
||||
self.assertEqual([1, 2, 4],
|
||||
[call[1][0] for call in mock_sleep.mock_calls])
|
||||
|
||||
self.assertEqual({'test/topic': 2, 'test/progress': 0},
|
||||
self.hass.data['mqtt'].wanted_topics)
|
||||
self.assertEqual({}, self.hass.data['mqtt'].subscribed_topics)
|
||||
self.assertEqual({}, self.hass.data['mqtt'].progress)
|
||||
def test_retained_message_on_subscribe_received(self):
|
||||
"""Test every subscriber receives retained message on subscribe."""
|
||||
def side_effect(*args):
|
||||
async_fire_mqtt_message(self.hass, 'test/state', 'online')
|
||||
return 0, 0
|
||||
|
||||
def test_invalid_mqtt_topics(self):
|
||||
"""Test invalid topics."""
|
||||
self.assertRaises(vol.Invalid, mqtt.valid_publish_topic, 'bad+topic')
|
||||
self.assertRaises(vol.Invalid, mqtt.valid_subscribe_topic, 'bad\0one')
|
||||
self.hass.data['mqtt']._mqttc.subscribe.side_effect = side_effect
|
||||
|
||||
calls_a = mock.MagicMock()
|
||||
mqtt.subscribe(self.hass, 'test/state', calls_a)
|
||||
self.hass.block_till_done()
|
||||
self.assertTrue(calls_a.called)
|
||||
|
||||
calls_b = mock.MagicMock()
|
||||
mqtt.subscribe(self.hass, 'test/state', calls_b)
|
||||
self.hass.block_till_done()
|
||||
self.assertTrue(calls_b.called)
|
||||
|
||||
def test_not_calling_unsubscribe_with_active_subscribers(self):
|
||||
"""Test not calling unsubscribe() when other subscribers are active."""
|
||||
unsub = mqtt.subscribe(self.hass, 'test/state', None)
|
||||
mqtt.subscribe(self.hass, 'test/state', None)
|
||||
self.hass.block_till_done()
|
||||
self.assertTrue(self.hass.data['mqtt']._mqttc.subscribe.called)
|
||||
|
||||
unsub()
|
||||
self.hass.block_till_done()
|
||||
self.assertFalse(self.hass.data['mqtt']._mqttc.unsubscribe.called)
|
||||
|
||||
def test_restore_subscriptions_on_reconnect(self):
|
||||
"""Test subscriptions are restored on reconnect."""
|
||||
mqtt.subscribe(self.hass, 'test/state', None)
|
||||
self.hass.block_till_done()
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.subscribe.call_count, 1)
|
||||
|
||||
self.hass.data['mqtt']._mqtt_on_disconnect(None, None, 0)
|
||||
self.hass.data['mqtt']._mqtt_on_connect(None, None, None, 0)
|
||||
self.hass.block_till_done()
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.subscribe.call_count, 2)
|
||||
|
||||
def test_restore_all_active_subscriptions_on_reconnect(self):
|
||||
"""Test active subscriptions are restored correctly on reconnect."""
|
||||
self.hass.data['mqtt']._mqttc.subscribe.side_effect = (
|
||||
(0, 1), (0, 2), (0, 3), (0, 4)
|
||||
)
|
||||
|
||||
unsub = mqtt.subscribe(self.hass, 'test/state', None, qos=2)
|
||||
mqtt.subscribe(self.hass, 'test/state', None)
|
||||
mqtt.subscribe(self.hass, 'test/state', None, qos=1)
|
||||
self.hass.block_till_done()
|
||||
|
||||
expected = [
|
||||
mock.call('test/state', 2),
|
||||
mock.call('test/state', 0),
|
||||
mock.call('test/state', 1)
|
||||
]
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.subscribe.mock_calls,
|
||||
expected)
|
||||
|
||||
unsub()
|
||||
self.hass.block_till_done()
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.unsubscribe.call_count,
|
||||
0)
|
||||
|
||||
self.hass.data['mqtt']._mqtt_on_disconnect(None, None, 0)
|
||||
self.hass.data['mqtt']._mqtt_on_connect(None, None, None, 0)
|
||||
self.hass.block_till_done()
|
||||
|
||||
expected.append(mock.call('test/state', 1))
|
||||
self.assertEqual(self.hass.data['mqtt']._mqttc.subscribe.mock_calls,
|
||||
expected)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
@@ -426,7 +448,7 @@ def test_setup_embedded_starts_with_no_config(hass):
|
||||
return_value=mock_coro(
|
||||
return_value=(True, client_config))
|
||||
) as _start:
|
||||
yield from mock_mqtt_client(hass, {})
|
||||
yield from async_mock_mqtt_client(hass, {})
|
||||
assert _start.call_count == 1
|
||||
|
||||
|
||||
@@ -440,7 +462,7 @@ def test_setup_embedded_with_embedded(hass):
|
||||
return_value=(True, client_config))
|
||||
) as _start:
|
||||
_start.return_value = mock_coro(return_value=(True, client_config))
|
||||
yield from mock_mqtt_client(hass, {'embedded': None})
|
||||
yield from async_mock_mqtt_client(hass, {'embedded': None})
|
||||
assert _start.call_count == 1
|
||||
|
||||
|
||||
@@ -544,13 +566,13 @@ def test_setup_with_tls_config_of_v1_under_python36_only_uses_v1(hass):
|
||||
@asyncio.coroutine
|
||||
def test_birth_message(hass):
|
||||
"""Test sending birth message."""
|
||||
mqtt_client = yield from mock_mqtt_client(hass, {
|
||||
mqtt_client = yield from async_mock_mqtt_client(hass, {
|
||||
mqtt.CONF_BROKER: 'mock-broker',
|
||||
mqtt.CONF_BIRTH_MESSAGE: {mqtt.ATTR_TOPIC: 'birth',
|
||||
mqtt.ATTR_PAYLOAD: 'birth'}
|
||||
})
|
||||
calls = []
|
||||
mqtt_client.publish = lambda *args: calls.append(args)
|
||||
mqtt_client.publish.side_effect = lambda *args: calls.append(args)
|
||||
hass.data['mqtt']._mqtt_on_connect(None, None, 0, 0)
|
||||
yield from hass.async_block_till_done()
|
||||
assert calls[-1] == ('birth', 'birth', 0, False)
|
||||
@@ -559,30 +581,26 @@ def test_birth_message(hass):
|
||||
@asyncio.coroutine
|
||||
def test_mqtt_subscribes_topics_on_connect(hass):
|
||||
"""Test subscription to topic on connect."""
|
||||
mqtt_client = yield from mock_mqtt_client(hass)
|
||||
mqtt_client = yield from async_mock_mqtt_client(hass)
|
||||
|
||||
subscribed_topics = OrderedDict()
|
||||
subscribed_topics['topic/test'] = 1
|
||||
subscribed_topics['home/sensor'] = 2
|
||||
|
||||
wanted_topics = subscribed_topics.copy()
|
||||
wanted_topics['still/pending'] = 0
|
||||
|
||||
hass.data['mqtt'].wanted_topics = wanted_topics
|
||||
hass.data['mqtt'].subscribed_topics = subscribed_topics
|
||||
hass.data['mqtt'].progress = {1: 'still/pending'}
|
||||
|
||||
# Return values for subscribe calls (rc, mid)
|
||||
mqtt_client.subscribe.side_effect = ((0, 2), (0, 3))
|
||||
hass.data['mqtt'].subscriptions = [
|
||||
mqtt.Subscription('topic/test', None),
|
||||
mqtt.Subscription('home/sensor', None, 2),
|
||||
mqtt.Subscription('still/pending', None),
|
||||
mqtt.Subscription('still/pending', None, 1),
|
||||
]
|
||||
|
||||
hass.add_job = mock.MagicMock()
|
||||
hass.data['mqtt']._mqtt_on_connect(None, None, 0, 0)
|
||||
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
assert not mqtt_client.disconnect.called
|
||||
assert mqtt_client.disconnect.call_count == 0
|
||||
|
||||
expected = [(topic, qos) for topic, qos in wanted_topics.items()]
|
||||
|
||||
assert [call[1][1:] for call in hass.add_job.mock_calls] == expected
|
||||
assert hass.data['mqtt'].progress == {}
|
||||
expected = {
|
||||
'topic/test': 0,
|
||||
'home/sensor': 2,
|
||||
'still/pending': 1
|
||||
}
|
||||
calls = {call[1][1]: call[1][2] for call in hass.add_job.mock_calls}
|
||||
assert calls == expected
|
||||
|
||||
Reference in New Issue
Block a user