diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 943efe00751..28d0ac9fb2a 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -44,9 +44,12 @@ from .const import ( ATTR_CAPTION, ATTR_CHAT_ACTION, ATTR_CHAT_ID, + ATTR_DIRECTORY_PATH, ATTR_DISABLE_NOTIF, ATTR_DISABLE_WEB_PREV, ATTR_FILE, + ATTR_FILE_ID, + ATTR_FILE_NAME, ATTR_IS_ANONYMOUS, ATTR_IS_BIG, ATTR_KEYBOARD, @@ -91,6 +94,7 @@ from .const import ( PLATFORM_WEBHOOKS, SERVICE_ANSWER_CALLBACK_QUERY, SERVICE_DELETE_MESSAGE, + SERVICE_DOWNLOAD_FILE, SERVICE_EDIT_CAPTION, SERVICE_EDIT_MESSAGE, SERVICE_EDIT_MESSAGE_MEDIA, @@ -328,6 +332,15 @@ SERVICE_SCHEMA_SET_MESSAGE_REACTION = vol.Schema( } ) +SERVICE_SCHEMA_DOWNLOAD_FILE = vol.Schema( + { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, + vol.Required(ATTR_FILE_ID): cv.string, + vol.Optional(ATTR_DIRECTORY_PATH): cv.string, + vol.Optional(ATTR_FILE_NAME): cv.string, + } +) + SERVICE_MAP: dict[str, VolSchemaType] = { SERVICE_SEND_MESSAGE: SERVICE_SCHEMA_SEND_MESSAGE, SERVICE_SEND_CHAT_ACTION: SERVICE_SCHEMA_SEND_CHAT_ACTION, @@ -347,6 +360,7 @@ SERVICE_MAP: dict[str, VolSchemaType] = { SERVICE_DELETE_MESSAGE: SERVICE_SCHEMA_DELETE_MESSAGE, SERVICE_LEAVE_CHAT: SERVICE_SCHEMA_LEAVE_CHAT, SERVICE_SET_MESSAGE_REACTION: SERVICE_SCHEMA_SET_MESSAGE_REACTION, + SERVICE_DOWNLOAD_FILE: SERVICE_SCHEMA_DOWNLOAD_FILE, } @@ -442,6 +456,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: await notify_service.set_message_reaction(context=service.context, **kwargs) elif msgtype == SERVICE_EDIT_MESSAGE_MEDIA: await notify_service.edit_message_media(context=service.context, **kwargs) + elif msgtype == SERVICE_DOWNLOAD_FILE: + return await notify_service.download_file(context=service.context, **kwargs) else: await notify_service.edit_message( msgtype, context=service.context, **kwargs @@ -487,6 +503,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: SERVICE_SEND_STICKER, SERVICE_SEND_LOCATION, SERVICE_SEND_POLL, + SERVICE_DOWNLOAD_FILE, ]: supports_response = SupportsResponse.OPTIONAL diff --git a/homeassistant/components/telegram_bot/bot.py b/homeassistant/components/telegram_bot/bot.py index 325aa7ffbc6..ce091863b40 100644 --- a/homeassistant/components/telegram_bot/bot.py +++ b/homeassistant/components/telegram_bot/bot.py @@ -5,6 +5,8 @@ import asyncio from collections.abc import Callable, Sequence import io import logging +import os +from pathlib import Path from ssl import SSLContext from types import MappingProxyType from typing import Any, cast @@ -13,6 +15,7 @@ import httpx from telegram import ( Bot, CallbackQuery, + File, InlineKeyboardButton, InlineKeyboardMarkup, InputMedia, @@ -45,6 +48,7 @@ from homeassistant.const import ( from homeassistant.core import Context, HomeAssistant from homeassistant.exceptions import HomeAssistantError, ServiceValidationError from homeassistant.helpers.dispatcher import async_dispatcher_send +from homeassistant.util.json import JsonValueType from homeassistant.util.ssl import get_default_context, get_default_no_verify_context from .const import ( @@ -61,6 +65,7 @@ from .const import ( ATTR_FILE_ID, ATTR_FILE_MIME_TYPE, ATTR_FILE_NAME, + ATTR_FILE_PATH, ATTR_FILE_SIZE, ATTR_FROM_FIRST, ATTR_FROM_LAST, @@ -1037,6 +1042,68 @@ class TelegramNotificationService: context=context, ) + async def download_file( + self, + file_id: str, + directory_path: str | None = None, + file_name: str | None = None, + context: Context | None = None, + **kwargs: dict[str, Any], + ) -> dict[str, JsonValueType]: + """Download a file from Telegram.""" + if not directory_path: + directory_path = self.hass.config.path(DOMAIN) + if not await self.hass.async_add_executor_job( + self.hass.config.is_allowed_path, directory_path + ): + raise ServiceValidationError( + "File path has not been configured in allowlist_external_dirs.", + translation_domain=DOMAIN, + translation_key="allowlist_external_dirs_error", + ) + file: File = await self._send_msg( + self.bot.get_file, + "Error getting file", + None, + file_id=file_id, + context=context, + ) + if not file.file_path: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="action_failed", + translation_placeholders={ + "error": "No file path returned from Telegram" + }, + ) + if not file_name: + file_name = os.path.basename(file.file_path) + + custom_path = os.path.join(directory_path, file_name) + await self.hass.async_add_executor_job( + self._prepare_download_directory, directory_path + ) + _LOGGER.debug("Download file %s to %s", file_id, custom_path) + try: + file_content = await file.download_as_bytearray() + await self.hass.async_add_executor_job( + Path(custom_path).write_bytes, file_content + ) + except (RuntimeError, OSError, TelegramError) as exc: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="action_failed", + translation_placeholders={"error": str(exc)}, + ) from exc + return {ATTR_FILE_PATH: custom_path} + + @staticmethod + def _prepare_download_directory(directory_path: str) -> None: + """Create download directory if it does not exist.""" + if not os.path.exists(directory_path): + _LOGGER.debug("directory %s does not exist, creating it", directory_path) + os.makedirs(directory_path, exist_ok=True) + def initialize_bot(hass: HomeAssistant, p_config: MappingProxyType[str, Any]) -> Bot: """Initialize telegram bot with proxy support.""" diff --git a/homeassistant/components/telegram_bot/const.py b/homeassistant/components/telegram_bot/const.py index a92661bdf42..58f02fa06fa 100644 --- a/homeassistant/components/telegram_bot/const.py +++ b/homeassistant/components/telegram_bot/const.py @@ -43,6 +43,7 @@ SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup" SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query" SERVICE_DELETE_MESSAGE = "delete_message" SERVICE_LEAVE_CHAT = "leave_chat" +SERVICE_DOWNLOAD_FILE = "download_file" SIGNAL_UPDATE_EVENT = "telegram_bot_update_event" EVENT_TELEGRAM_CALLBACK = "telegram_callback" @@ -83,9 +84,11 @@ ATTR_CHAT_INSTANCE = "chat_instance" ATTR_DATE = "date" ATTR_DISABLE_NOTIF = "disable_notification" ATTR_DISABLE_WEB_PREV = "disable_web_page_preview" +ATTR_DIRECTORY_PATH = "directory_path" ATTR_EDITED_MSG = "edited_message" ATTR_FILE = "file" ATTR_FILE_ID = "file_id" +ATTR_FILE_PATH = "file_path" ATTR_FILE_MIME_TYPE = "file_mime_type" ATTR_FILE_NAME = "file_name" ATTR_FILE_SIZE = "file_size" diff --git a/homeassistant/components/telegram_bot/icons.json b/homeassistant/components/telegram_bot/icons.json index 0f4c4dfca52..50f4424f945 100644 --- a/homeassistant/components/telegram_bot/icons.json +++ b/homeassistant/components/telegram_bot/icons.json @@ -13,6 +13,9 @@ "delete_message": { "service": "mdi:delete" }, + "download_file": { + "service": "mdi:paperclip" + }, "edit_caption": { "service": "mdi:pencil" }, diff --git a/homeassistant/components/telegram_bot/services.yaml b/homeassistant/components/telegram_bot/services.yaml index e0480b8bfdc..e53e19ed98b 100644 --- a/homeassistant/components/telegram_bot/services.yaml +++ b/homeassistant/components/telegram_bot/services.yaml @@ -911,3 +911,25 @@ set_message_reaction: required: false selector: boolean: + +download_file: + fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot + file_id: + required: true + example: "ABCD1234Efgh5678Ijkl90mnopQRStuvwx" + selector: + text: + directory_path: + required: false + default: "/config/telegram_bot" + selector: + text: + file_name: + required: false + example: "my_downloaded_file" + selector: + text: diff --git a/homeassistant/components/telegram_bot/strings.json b/homeassistant/components/telegram_bot/strings.json index 6c6d3a8c8e7..71620b54c09 100644 --- a/homeassistant/components/telegram_bot/strings.json +++ b/homeassistant/components/telegram_bot/strings.json @@ -324,6 +324,29 @@ }, "name": "Delete message" }, + "download_file": { + "description": "Download the file to a local path.", + "fields": { + "config_entry_id": { + "description": "The config entry representing the Telegram bot to get the file.", + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]" + }, + "directory_path": { + "description": "Local directory path to save the file to. Defaults to the 'telegram_bot' directory within your Home Assistant configuration directory.", + "example": "/config/telegram_bot", + "name": "Directory path" + }, + "file_id": { + "description": "ID of the file to get.", + "name": "File ID" + }, + "file_name": { + "description": "Name to save the file as. If not provided, the original file name will be used.", + "name": "File name" + } + }, + "name": "Download file" + }, "edit_caption": { "description": "Edits the caption of a previously sent message.", "fields": { diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index 1118dea6c59..25aa49e93fc 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -4,11 +4,20 @@ import base64 from datetime import datetime from http import HTTPStatus import io +import os +from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest -from telegram import Chat, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update +from telegram import ( + Chat, + File, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + Update, +) from telegram.constants import ChatType, InputMediaType, ParseMode from telegram.error import ( InvalidToken, @@ -29,9 +38,12 @@ from homeassistant.components.telegram_bot.const import ( ATTR_CAPTION, ATTR_CHAT_ACTION, ATTR_CHAT_ID, + ATTR_DIRECTORY_PATH, ATTR_DISABLE_NOTIF, ATTR_DISABLE_WEB_PREV, ATTR_FILE, + ATTR_FILE_ID, + ATTR_FILE_NAME, ATTR_KEYBOARD, ATTR_KEYBOARD_INLINE, ATTR_MEDIA_TYPE, @@ -85,7 +97,7 @@ from homeassistant.const import ( HTTP_BEARER_AUTHENTICATION, HTTP_DIGEST_AUTHENTICATION, ) -from homeassistant.core import Context, Event, HomeAssistant +from homeassistant.core import Context, Event, HomeAssistant, ServiceResponse from homeassistant.exceptions import ( ConfigEntryAuthFailed, HomeAssistantError, @@ -392,6 +404,71 @@ def _read_file_as_bytesio_mock(file_path): return _file +async def _run_download_file_service_with_mocks( + hass: HomeAssistant, + schema_request: dict[str, Any], + telegram_file: File, + download_bytes: Any, +) -> tuple[AsyncMock, AsyncMock, dict[str, Any], ServiceResponse]: + """Run the download_file service with common mocks and return mocks/results. + + Returns (get_file_mock, download_as_bytearray_mock, write_called) + """ + write_called: dict[str, Any] = {} + + def fake_write_bytes(self, data: bytes) -> None: + write_called["self"] = self + write_called["data"] = data + + with ( + patch( + "homeassistant.components.telegram_bot.bot.Bot.get_file", + AsyncMock(return_value=telegram_file), + ) as get_file_mock, + patch( + "telegram.File.download_as_bytearray", + AsyncMock(return_value=download_bytes), + ) as download_as_bytearray_mock, + patch("pathlib.Path.write_bytes", new=fake_write_bytes), + ): + response = await hass.services.async_call( + DOMAIN, + "download_file", + schema_request, + blocking=True, + return_response=True, + ) + await hass.async_block_till_done() + + return get_file_mock, download_as_bytearray_mock, write_called, response + + +def _assert_download_file_mocks_calls( + file_content, + expected_path, + file_id, + get_file_mock, + download_as_bytearray_mock, + write_called, +): + """Common assert for mocks calls.""" + get_file_mock.assert_called_once_with(file_id=file_id) + download_as_bytearray_mock.assert_called_once() + assert "self" in write_called + assert str(write_called["self"]) == expected_path + assert write_called["data"] == file_content + + +def _assert_download_file_response( + response: ServiceResponse, expected_path: str +) -> None: + """Assert download file response.""" + assert response is not None + assert len(response.keys()) == 1 + assert "file_path" in response + assert response["file_path"] == expected_path + + async def test_send_chat_action( hass: HomeAssistant, webhook_bot, @@ -1649,3 +1726,363 @@ async def test_deprecated_timeout_parameter( # verify issue is resolved assert not issue_registry.async_get_issue(DOMAIN, "deprecated_timeout_parameter") + + +@pytest.mark.parametrize( + ( + "telegram_file_name", + "schema_request", + "expected_download_path", + ), + [ + pytest.param( + "file_name1.jpg", + {ATTR_FILE_ID: "some-file-id-1"}, + lambda hass: f"{hass.config.path(DOMAIN)}/file_name1.jpg", + id="no_custom_name", + ), + pytest.param( + "file_name2.jpg", + {ATTR_FILE_ID: "some-file-id-2", ATTR_FILE_NAME: "custom_name2.jpg"}, + lambda hass: f"{hass.config.path(DOMAIN)}/custom_name2.jpg", + id="custom_name", + ), + ], +) +async def test_download_file_no_custom_dir( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, + telegram_file_name: str, + schema_request: dict[str, Any], + expected_download_path: Any, +) -> None: + """Test download file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = hass.config.path(DOMAIN) + expected_path = expected_download_path(hass) + # verify dir exists, if not create it + await hass.async_add_executor_job(os.makedirs, allowlist_dir, 0o777, True) + + hass.config.allowlist_external_dirs.add(allowlist_dir) + + file_id = schema_request[ATTR_FILE_ID] + telegram_file = File( + file_id=file_id, + file_unique_id="file_unique_id", + file_path=f"file/path/{telegram_file_name}", + ) + file_content = f"This is the file content of {telegram_file_name}".encode() + ( + get_file_mock, + download_as_bytearray_mock, + write_called, + response, + ) = await _run_download_file_service_with_mocks( + hass, + schema_request, + telegram_file, + file_content, + ) + + _assert_download_file_mocks_calls( + file_content, + expected_path, + file_id, + get_file_mock, + download_as_bytearray_mock, + write_called, + ) + _assert_download_file_response(response, expected_path) + + +@pytest.mark.parametrize( + ( + "telegram_file_name", + "schema_request", + "expected_download_path", + ), + [ + pytest.param( + "file_name3.jpg", + { + ATTR_FILE_ID: "some-file-id-3", + }, + lambda path: f"{path}/file_name3.jpg", + id="no_custom_name_custom_dir", + ), + pytest.param( + "file_name4.jpg", + { + ATTR_FILE_ID: "some-file-id-4", + ATTR_FILE_NAME: "custom_name4.jpg", + }, + lambda path: f"{path}/custom_name4.jpg", + id="custom_name_custom_dir", + ), + ], +) +async def test_download_file_custom_dir( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, + telegram_file_name: str, + schema_request: dict[str, Any], + expected_download_path: Any, +) -> None: + """Test download file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = tmp_path.as_posix() + schema_request[ATTR_DIRECTORY_PATH] = allowlist_dir + expected_path = expected_download_path(tmp_path.as_posix()) + + hass.config.allowlist_external_dirs.add(allowlist_dir) + + file_id = schema_request[ATTR_FILE_ID] + telegram_file = File( + file_id=file_id, + file_unique_id="file_unique_id", + file_path=f"file/path/{telegram_file_name}", + ) + file_content = f"This is the file content of {telegram_file_name}".encode() + ( + get_file_mock, + download_as_bytearray_mock, + write_called, + response, + ) = await _run_download_file_service_with_mocks( + hass, + schema_request, + telegram_file, + file_content, + ) + + _assert_download_file_mocks_calls( + file_content, + expected_path, + file_id, + get_file_mock, + download_as_bytearray_mock, + write_called, + ) + _assert_download_file_response(response, expected_path) + + +async def test_download_file_directory_created_successfully( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test on non exists temporary directory.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = tmp_path.as_posix() + hass.config.allowlist_external_dirs.add(allowlist_dir) + download_path = os.path.join(allowlist_dir, "download_dir") + + schema_request = { + ATTR_FILE_ID: "file-id-for-new-dir", + ATTR_DIRECTORY_PATH: download_path, + ATTR_FILE_NAME: "dont_care.jpg", + } + + expected_path = os.path.join(download_path, schema_request[ATTR_FILE_NAME]) + + telegram_file = File( + file_id=schema_request[ATTR_FILE_ID], + file_unique_id="file_unique_id", + file_path=f"file/path/{schema_request[ATTR_FILE_NAME]}", + ) + file_content = "file content for new dir" + ( + get_file_mock, + download_as_bytearray_mock, + write_called, + response, + ) = await _run_download_file_service_with_mocks( + hass, schema_request, telegram_file, file_content + ) + + _assert_download_file_mocks_calls( + file_content, + expected_path, + schema_request[ATTR_FILE_ID], + get_file_mock, + download_as_bytearray_mock, + write_called, + ) + _assert_download_file_response(response, expected_path) + + +async def test_download_file_when_bot_failed_to_get_file( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test download file when bot failed to get file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = tmp_path.as_posix() + hass.config.allowlist_external_dirs.add(allowlist_dir) + + schema_request = { + ATTR_FILE_ID: "some-file-id", + ATTR_DIRECTORY_PATH: tmp_path.as_posix(), + ATTR_FILE_NAME: "custom_name.jpg", + } + + with ( + patch( + "homeassistant.components.telegram_bot.bot.Bot.get_file", + AsyncMock(side_effect=TelegramError("failed to get file")), + ), + pytest.raises(HomeAssistantError) as err, + ): + await hass.services.async_call( + DOMAIN, + "download_file", + schema_request, + blocking=True, + ) + await hass.async_block_till_done() + assert err.value.translation_key == "action_failed" + + +async def test_download_file_when_dir_not_allowed( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test download file when bot failed to get file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + telegram_file_name = "some_file.jpg" + + schema_request = { + ATTR_FILE_ID: "some-file-id", + ATTR_DIRECTORY_PATH: tmp_path.as_posix(), + ATTR_FILE_NAME: telegram_file_name, + } + + with pytest.raises(ServiceValidationError) as err: + await hass.services.async_call( + DOMAIN, + "download_file", + schema_request, + blocking=True, + return_response=True, + ) + + await hass.async_block_till_done() + assert err.value.translation_key == "allowlist_external_dirs_error" + + +async def test_download_file_when_empty_file_path( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test download file when bot failed to get file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = tmp_path.as_posix() + hass.config.allowlist_external_dirs.add(allowlist_dir) + + schema_request = { + ATTR_FILE_ID: "some-file-id", + ATTR_DIRECTORY_PATH: tmp_path.as_posix(), + ATTR_FILE_NAME: "custom_name.jpg", + } + telegram_file = File( + file_id=schema_request[ATTR_FILE_ID], + file_unique_id="file_unique_id", + ) + + with pytest.raises(HomeAssistantError) as err: + await _run_download_file_service_with_mocks( + hass, schema_request, telegram_file, "file_content" + ) + await hass.async_block_till_done() + assert err.value.translation_placeholders is not None + assert "error" in err.value.translation_placeholders + assert ( + err.value.translation_placeholders["error"] + == "No file path returned from Telegram" + ) + + +@pytest.mark.parametrize( + "exception_class", + [ + RuntimeError, + OSError, + TelegramError, + ], +) +async def test_download_file_when_error_when_downloading( + tmp_path: Path, + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, + exception_class: type[Exception], +) -> None: + """Test download file when bot failed to get file.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + allowlist_dir = tmp_path.as_posix() + hass.config.allowlist_external_dirs.add(allowlist_dir) + + schema_request = { + ATTR_FILE_ID: "some-file-id", + ATTR_DIRECTORY_PATH: tmp_path.as_posix(), + ATTR_FILE_NAME: "custom_name.jpg", + } + + telegram_file = File( + file_id=schema_request[ATTR_FILE_ID], + file_unique_id="file_unique_id", + file_path=f"file/path/{schema_request[ATTR_FILE_NAME]}", + ) + + with ( + patch( + "homeassistant.components.telegram_bot.bot.Bot.get_file", + AsyncMock(return_value=telegram_file), + ), + patch( + "telegram.File.download_as_bytearray", + AsyncMock(side_effect=exception_class("failed to download file")), + ), + pytest.raises(HomeAssistantError) as err, + ): + await hass.services.async_call( + DOMAIN, + "download_file", + schema_request, + blocking=True, + return_response=True, + ) + await hass.async_block_till_done() + assert err.value.translation_placeholders is not None + assert "error" in err.value.translation_placeholders + assert err.value.translation_placeholders["error"] == "failed to download file"