diff --git a/homeassistant/components/api.py b/homeassistant/components/api.py index 351bdf5dcd2..f2a94d7633c 100644 --- a/homeassistant/components/api.py +++ b/homeassistant/components/api.py @@ -73,102 +73,94 @@ class APIEventStream(HomeAssistantView): def get(self, request): """Provide a streaming interface for the event bus.""" + from eventlet.queue import Empty import eventlet - from eventlet import queue as eventlet_queue - import queue as thread_queue - from threading import Event - from time import time + import homeassistant.util.eventlet as eventlet_util - to_write = thread_queue.Queue() - # to_write = eventlet.Queue() + cur_hub = eventlet.hubs.get_hub() + request.environ['eventlet.minimum_write_chunk_size'] = 0 + + to_write = eventlet.Queue() stop_obj = object() - hass = self.hass - connection_closed = Event() + attached_ping = None restrict = request.args.get('restrict') if restrict: restrict = restrict.split(',') - restrict = False + def thread_ping(now): + """Called from time thread to add ping to queue.""" + _LOGGER.debug('STREAM %s PING', id(stop_obj)) + eventlet_util.spawn(cur_hub, to_write.put, STREAM_PING_PAYLOAD) - def ping(now): - """Add a ping message to queue.""" - print(id(stop_obj), 'ping') - to_write.put(STREAM_PING_PAYLOAD) - - def forward_events(event): + def thread_forward_events(event): """Forward events to the open request.""" - print(id(stop_obj), 'forwarding', event) if event.event_type == EVENT_TIME_CHANGED: - pass - elif event.event_type == EVENT_HOMEASSISTANT_STOP: - to_write.put(stop_obj) + return + + _LOGGER.debug('STREAM %s FORWARDING %s', id(stop_obj), event) + + if event.event_type == EVENT_HOMEASSISTANT_STOP: + data = stop_obj else: - to_write.put(json.dumps(event, cls=rem.JSONEncoder)) + data = json.dumps(event, cls=rem.JSONEncoder) + + eventlet_util.spawn(cur_hub, to_write.put, data) + + def cleanup(): + """Clean up HA listeners.""" + _LOGGER.debug("STREAM %s CLEANING UP", id(stop_obj)) + self.hass.bus.remove_listener(EVENT_TIME_CHANGED, attached_ping) + + if restrict: + for event in restrict: + self.hass.bus.remove_listener(event, thread_forward_events) + else: + self.hass.bus.remove_listener(MATCH_ALL, thread_forward_events) def stream(): """Stream events to response.""" + nonlocal attached_ping + if restrict: for event_type in restrict: - hass.bus.listen(event_type, forward_events) + self.hass.bus.listen(event_type, thread_forward_events) else: - hass.bus.listen(MATCH_ALL, forward_events) + self.hass.bus.listen(MATCH_ALL, thread_forward_events) attached_ping = track_utc_time_change( - hass, ping, second=(0, 30)) + self.hass, thread_ping, second=range(0, 60, 3)) #(0, 30)) - print(id(stop_obj), 'attached goodness') + _LOGGER.debug('STREAM %s ATTACHED', id(stop_obj)) - while not connection_closed.is_set(): + while True: try: - print(id(stop_obj), "Try getting obj") - payload = to_write.get(False) + # Somehow our queue.get takes too long to + # be notified of arrival of object. Probably + # because of our spawning on hub in other thread + # hack. Because current goal is to get this out, + # We just timeout every second because it will + # return right away if qsize() > 0. + # So yes, we're basically polling :( + # socket.io anyone? + payload = to_write.get(timeout=1) if payload is stop_obj: break msg = "data: {}\n\n".format(payload) - print(id(stop_obj), msg) + _LOGGER.debug('STREAM %s WRITING %s', id(stop_obj), + msg.strip()) yield msg.encode("UTF-8") - except eventlet_queue.Empty: - print(id(stop_obj), "queue empty, sleep 0.5") - eventlet.sleep(.5) - except GeneratorExit: + except Empty: pass + except GeneratorExit: + _LOGGER.debug('STREAM %s RESPONSE CLOSED', id(stop_obj)) + break - print(id(stop_obj), "cleaning up") + cleanup() - hass.bus.remove_listener(EVENT_TIME_CHANGED, attached_ping) - - if restrict: - for event in restrict: - hass.bus.remove_listener(event, forward_events) - else: - hass.bus.remove_listener(MATCH_ALL, forward_events) - - resp = self.Response(stream(), mimetype='text/event-stream') - - def closing(): - print() - print() - print() - print() - print() - print() - print() - print() - print(id(stop_obj), "CLOSING RESPONSE") - print() - print() - print() - print() - print() - print() - print() - connection_closed.set() - - resp.call_on_close(closing) - return resp + return self.Response(stream(), mimetype='text/event-stream') class APIConfigView(HomeAssistantView): diff --git a/homeassistant/components/http.py b/homeassistant/components/http.py index d4965eb05c4..59d76a3c8a8 100644 --- a/homeassistant/components/http.py +++ b/homeassistant/components/http.py @@ -213,9 +213,6 @@ class HomeAssistantWSGI(object): """Register a folder to serve as a static path.""" from static import Cling - if url_root in self.extra_apps: - _LOGGER.warning("Static path '%s' is being overwritten", path) - headers = [] if not self.development: @@ -228,7 +225,14 @@ class HomeAssistantWSGI(object): "public, max-age={}".format(cache_time) }) - self.extra_apps[url_root] = Cling(path, headers=headers) + self.register_wsgi_app(url_root, Cling(path, headers=headers)) + + def register_wsgi_app(self, url_root, app): + """Register a path to serve a WSGI app.""" + if url_root in self.extra_apps: + _LOGGER.warning("Url root '%s' is being overwritten", url_root) + + self.extra_apps[url_root] = app def start(self): """Start the wsgi server.""" @@ -248,20 +252,21 @@ class HomeAssistantWSGI(object): ) from werkzeug.routing import RequestRedirect - adapter = self.url_map.bind_to_environ(request.environ) - try: - endpoint, values = adapter.match() - return self.views[endpoint].handle_request(request, **values) - except RequestRedirect as ex: - return ex - except BadRequest as ex: - return self._handle_error(request, str(ex), 400) - except NotFound as ex: - return self._handle_error(request, str(ex), 404) - except MethodNotAllowed as ex: - return self._handle_error(request, str(ex), 405) - except Unauthorized as ex: - return self._handle_error(request, str(ex), 401) + with request: + adapter = self.url_map.bind_to_environ(request.environ) + try: + endpoint, values = adapter.match() + return self.views[endpoint].handle_request(request, **values) + except RequestRedirect as ex: + return ex + except BadRequest as ex: + return self._handle_error(request, str(ex), 400) + except NotFound as ex: + return self._handle_error(request, str(ex), 404) + except MethodNotAllowed as ex: + return self._handle_error(request, str(ex), 405) + except Unauthorized as ex: + return self._handle_error(request, str(ex), 401) # TODO This long chain of except blocks is silly. _handle_error should # just take the exception as an argument and parse the status code # itself diff --git a/homeassistant/util/eventlet.py b/homeassistant/util/eventlet.py new file mode 100644 index 00000000000..54cb8cfdbe7 --- /dev/null +++ b/homeassistant/util/eventlet.py @@ -0,0 +1,9 @@ +"""Eventlet util methods.""" + + +def spawn(hub, func, *args, **kwargs): + """Spawns a function on specified hub.""" + import eventlet + g = eventlet.greenthread.GreenThread(hub.greenlet) + hub.schedule_call_global(0, g.switch, func, args, kwargs) + return g diff --git a/tests/components/test_api.py b/tests/components/test_api.py index bea333aa36b..13a2f8746b4 100644 --- a/tests/components/test_api.py +++ b/tests/components/test_api.py @@ -435,63 +435,61 @@ class TestAPI(unittest.TestCase): headers=HA_HEADERS) self.assertEqual(200, req.status_code) - # def test_stream(self): - # """Test the stream.""" - # listen_count = self._listen_count() - # with closing(requests.get(_url(const.URL_API_STREAM), - # stream=True, headers=HA_HEADERS)) as req: + def test_stream(self): + """Test the stream.""" + listen_count = self._listen_count() + with closing(requests.get(_url(const.URL_API_STREAM), + stream=True, headers=HA_HEADERS)) as req: - # self.assertEqual(listen_count + 1, self._listen_count()) + self.assertEqual(listen_count + 2, self._listen_count()) - # # eventlet.sleep(1) - # print('firing event') + hass.bus.fire('test_event') + hass.pool.block_till_done() - # hass.bus.fire('test_event') - # hass.pool.block_till_done() + data = self._stream_next_event(req) - # data = self._stream_next_event(req) + self.assertEqual('test_event', data['event_type']) - # self.assertEqual('test_event', data['event_type']) + def test_stream_with_restricted(self): + """Test the stream with restrictions.""" + listen_count = self._listen_count() + url = _url('{}?restrict=test_event1,test_event3'.format( + const.URL_API_STREAM)) - # def test_stream_with_restricted(self): - # """Test the stream with restrictions.""" - # listen_count = self._listen_count() - # with closing(requests.get(_url(const.URL_API_STREAM), - # data=json.dumps({ - # 'restrict': - # 'test_event1,test_event3'}), - # stream=True, headers=HA_HEADERS)) as req: + with closing(requests.get(url, stream=True, + headers=HA_HEADERS)) as req: - # data = self._stream_next_event(req) - # self.assertEqual('ping', data) + self.assertEqual(listen_count + 3, self._listen_count()) - # self.assertEqual(listen_count + 2, self._listen_count()) + hass.bus.fire('test_event1') + hass.pool.block_till_done() + hass.bus.fire('test_event2') + hass.pool.block_till_done() + hass.bus.fire('test_event3') + hass.pool.block_till_done() - # hass.bus.fire('test_event1') - # hass.pool.block_till_done() - # hass.bus.fire('test_event2') - # hass.pool.block_till_done() - # hass.bus.fire('test_event3') - # hass.pool.block_till_done() - - # data = self._stream_next_event(req) - # self.assertEqual('test_event1', data['event_type']) - # data = self._stream_next_event(req) - # self.assertEqual('test_event3', data['event_type']) + data = self._stream_next_event(req) + self.assertEqual('test_event1', data['event_type']) + data = self._stream_next_event(req) + self.assertEqual('test_event3', data['event_type']) def _stream_next_event(self, stream): - """Test the stream for next event.""" - data = b'' - last_new_line = False - for dat in stream.iter_content(1): - if dat == b'\n' and last_new_line: + """Read the stream for next event while ignoring ping.""" + while True: + data = b'' + last_new_line = False + for dat in stream.iter_content(1): + if dat == b'\n' and last_new_line: + break + data += dat + last_new_line = dat == b'\n' + + conv = data.decode('utf-8').strip()[6:] + + if conv != 'ping': break - data += dat - last_new_line = dat == b'\n' - conv = data.decode('utf-8').strip()[6:] - - return conv if conv == 'ping' else json.loads(conv) + return json.loads(conv) def _listen_count(self): """Return number of event listeners."""