"""Telegram bot classes and utilities.""" from abc import abstractmethod import asyncio from collections.abc import Awaitable, Callable, Iterable, Sequence import io import logging import os from pathlib import Path from types import MappingProxyType from typing import Any, cast import httpx from telegram import ( Bot, CallbackQuery, File, InlineKeyboardButton, InlineKeyboardMarkup, InputMedia, InputMediaAnimation, InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo, InputPollOption, Message, PhotoSize, ReplyKeyboardMarkup, ReplyKeyboardRemove, Update, User, ) from telegram.constants import InputMediaType, ParseMode from telegram.error import TelegramError from telegram.ext import CallbackContext, filters from telegram.request import HTTPXRequest from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_COMMAND, ATTR_DATE, CONF_API_KEY, HTTP_BASIC_AUTHENTICATION, HTTP_BEARER_AUTHENTICATION, HTTP_DIGEST_AUTHENTICATION, ) from homeassistant.core import Context, HomeAssistant from homeassistant.exceptions import HomeAssistantError, ServiceValidationError from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.httpx_client import get_async_client from homeassistant.util import raise_if_invalid_filename, raise_if_invalid_path from homeassistant.util.json import JsonValueType from .const import ( ATTR_ARGS, ATTR_AUTHENTICATION, ATTR_CAPTION, ATTR_CHAT_ID, ATTR_CHAT_INSTANCE, ATTR_DATA, ATTR_DISABLE_NOTIF, ATTR_DISABLE_WEB_PREV, ATTR_FILE, ATTR_FILE_ID, ATTR_FILE_MIME_TYPE, ATTR_FILE_NAME, ATTR_FILE_PATH, ATTR_FILE_SIZE, ATTR_FROM_FIRST, ATTR_FROM_LAST, ATTR_INLINE_MESSAGE_ID, ATTR_KEYBOARD, ATTR_KEYBOARD_INLINE, ATTR_MEDIA, ATTR_MEDIA_TYPE, ATTR_MESSAGE, ATTR_MESSAGE_ID, ATTR_MESSAGE_TAG, ATTR_MESSAGE_THREAD_ID, ATTR_MSG, ATTR_MSGID, ATTR_ONE_TIME_KEYBOARD, ATTR_OPEN_PERIOD, ATTR_PARSER, ATTR_PASSWORD, ATTR_PROTECT_CONTENT, ATTR_REPLY_TO_MSGID, ATTR_REPLYMARKUP, ATTR_RESIZE_KEYBOARD, ATTR_STICKER_ID, ATTR_TEXT, ATTR_TIMEOUT, ATTR_TITLE, ATTR_URL, ATTR_USER_ID, ATTR_USERNAME, ATTR_VERIFY_SSL, CONF_API_ENDPOINT, CONF_CHAT_ID, CONF_PROXY_URL, DEFAULT_TIMEOUT_SECONDS, DOMAIN, EVENT_TELEGRAM_ATTACHMENT, EVENT_TELEGRAM_CALLBACK, EVENT_TELEGRAM_COMMAND, EVENT_TELEGRAM_SENT, EVENT_TELEGRAM_TEXT, PARSER_HTML, PARSER_MD, PARSER_MD2, PARSER_PLAIN_TEXT, SERVICE_EDIT_CAPTION, SERVICE_EDIT_MESSAGE, SERVICE_SEND_DOCUMENT, SERVICE_SEND_PHOTO, SERVICE_SEND_STICKER, SERVICE_SEND_VIDEO, SERVICE_SEND_VOICE, ) from .helpers import signal _FILE_TYPES = ("animation", "document", "photo", "sticker", "video", "voice") _LOGGER = logging.getLogger(__name__) type TelegramBotConfigEntry = ConfigEntry[TelegramNotificationService] def _get_bot_info(bot: Bot, config_entry: ConfigEntry) -> dict[str, Any]: return { "config_entry_id": config_entry.entry_id, "id": bot.id, "first_name": bot.first_name, "last_name": bot.last_name, "username": bot.username, } class BaseTelegramBot: """The base class for the telegram bot.""" def __init__( self, hass: HomeAssistant, config: TelegramBotConfigEntry, bot: Bot ) -> None: """Initialize the bot base class.""" self.hass = hass self.config = config self.most_recent_chat_id: int | None = None self._bot = bot @abstractmethod async def shutdown(self) -> None: """Shutdown the bot application.""" async def handle_update(self, update: Update, context: CallbackContext) -> bool: """Handle updates from bot application set up by the respective platform.""" _LOGGER.debug("Handling update %s", update) # establish event type: text, command or callback_query if update.callback_query: # NOTE: Check for callback query first since # effective message will be populated with the message # in .callback_query (python-telegram-bot docs are wrong) event_type, event_data = self._get_callback_query_event_data( update.callback_query ) elif update.effective_message: event_type, event_data = self._get_message_event_data( update.effective_message ) else: _LOGGER.warning("Unhandled update: %s", update) return True self.most_recent_chat_id = event_data[ATTR_CHAT_ID] if not self.authorize_update(update): return False event_data["bot"] = _get_bot_info(self._bot, self.config) event_context = Context() _LOGGER.debug("Firing event %s: %s", event_type, event_data) self.hass.bus.async_fire(event_type, event_data, context=event_context) async_dispatcher_send(self.hass, signal(self._bot), event_type, event_data) return True @staticmethod def _get_command_event_data(command_text: str | None) -> dict[str, str | list]: if not command_text or not command_text.startswith("/"): return {} command_parts = command_text.split() command = command_parts[0] args = command_parts[1:] return {ATTR_COMMAND: command, ATTR_ARGS: args} def _get_message_event_data(self, message: Message) -> tuple[str, dict[str, Any]]: event_data: dict[str, Any] = { ATTR_MSGID: message.message_id, ATTR_CHAT_ID: message.chat.id, ATTR_DATE: message.date, ATTR_MESSAGE_THREAD_ID: message.message_thread_id, } if filters.COMMAND.filter(message): # This is a command message - set event type # to command and split data into command and args event_type = EVENT_TELEGRAM_COMMAND event_data.update(self._get_command_event_data(message.text)) elif filters.ATTACHMENT.filter(message): event_type = EVENT_TELEGRAM_ATTACHMENT event_data[ATTR_TEXT] = message.caption event_data.update(self._get_file_id_event_data(message)) else: event_type = EVENT_TELEGRAM_TEXT event_data[ATTR_TEXT] = message.text if message.from_user: event_data.update(self._get_user_event_data(message.from_user)) return event_type, event_data def _get_file_id_event_data(self, message: Message) -> dict[str, Any]: """Extract file_id from a message attachment, if any.""" if filters.PHOTO.filter(message): photos = cast(Sequence[PhotoSize], message.effective_attachment) return { ATTR_FILE_ID: photos[-1].file_id, ATTR_FILE_MIME_TYPE: "image/jpeg", ATTR_FILE_SIZE: photos[-1].file_size, } return { k: getattr(message.effective_attachment, v) for k, v in ( (ATTR_FILE_ID, "file_id"), (ATTR_FILE_NAME, "file_name"), (ATTR_FILE_MIME_TYPE, "mime_type"), (ATTR_FILE_SIZE, "file_size"), ) if hasattr(message.effective_attachment, v) } def _get_user_event_data(self, user: User) -> dict[str, Any]: return { ATTR_USER_ID: user.id, ATTR_FROM_FIRST: user.first_name, ATTR_FROM_LAST: user.last_name, } def _get_callback_query_event_data( self, callback_query: CallbackQuery ) -> tuple[str, dict[str, Any]]: event_type = EVENT_TELEGRAM_CALLBACK event_data: dict[str, Any] = { ATTR_MSGID: callback_query.id, ATTR_CHAT_INSTANCE: callback_query.chat_instance, ATTR_DATA: callback_query.data, ATTR_MSG: None, ATTR_CHAT_ID: None, } if callback_query.message: event_data[ATTR_MSG] = callback_query.message.to_dict() event_data[ATTR_CHAT_ID] = callback_query.message.chat.id if callback_query.from_user: event_data.update(self._get_user_event_data(callback_query.from_user)) # Split data into command and args if possible event_data.update(self._get_command_event_data(callback_query.data)) return event_type, event_data def authorize_update(self, update: Update) -> bool: """Make sure either user or chat is in allowed_chat_ids.""" from_user = update.effective_user.id if update.effective_user else None from_chat = update.effective_chat.id if update.effective_chat else None allowed_chat_ids: list[int] = [ subentry.data[CONF_CHAT_ID] for subentry in self.config.subentries.values() ] if from_user in allowed_chat_ids or from_chat in allowed_chat_ids: return True _LOGGER.error( ( "Unauthorized update - neither user id %s nor chat id %s is in allowed" " chats: %s" ), from_user, from_chat, allowed_chat_ids, ) return False class TelegramNotificationService: """Implement the notification services for the Telegram Bot domain.""" def __init__( self, hass: HomeAssistant, app: BaseTelegramBot | None, bot: Bot, config: TelegramBotConfigEntry, parser: str, ) -> None: """Initialize the service.""" self.app = app self.config = config self.old_config_data = config.data.copy() self._parsers: dict[str, str | None] = { PARSER_HTML: ParseMode.HTML, PARSER_MD: ParseMode.MARKDOWN, PARSER_MD2: ParseMode.MARKDOWN_V2, PARSER_PLAIN_TEXT: None, } self.parse_mode = self._parsers[parser] self.bot = bot self.hass = hass self._last_message_id: dict[int, int] = {} def _get_msg_ids( self, msg_data: dict[str, Any], chat_id: int ) -> tuple[Any | None, int | None]: """Get the message id to edit. This can be one of (message_id, inline_message_id) from a msg dict, returning a tuple. **You can use 'last' as message_id** to edit the message last sent in the chat_id. """ message_id: Any | None = None inline_message_id: int | None = None if ATTR_MESSAGE_ID in msg_data: message_id = msg_data[ATTR_MESSAGE_ID] if ( isinstance(message_id, str) and (message_id == "last") and (chat_id in self._last_message_id) ): message_id = self._last_message_id[chat_id] else: inline_message_id = msg_data[ATTR_INLINE_MESSAGE_ID] return message_id, inline_message_id def _get_msg_kwargs(self, data: dict[str, Any]) -> dict[str, Any]: """Get parameters in message data kwargs.""" def _make_row_inline_keyboard(row_keyboard: Any) -> list[InlineKeyboardButton]: """Make a list of InlineKeyboardButtons. It can accept: - a list of tuples like: `[(text_b1, data_callback_b1), (text_b2, data_callback_b2), ...] - a string like: `/cmd1, /cmd2, /cmd3` - or a string like: `text_b1:/cmd1, text_b2:/cmd2` - also supports urls instead of callback commands """ buttons = [] if isinstance(row_keyboard, str): for key in row_keyboard.split(","): if ":/" in key: # check if command or URL if "https://" in key: label = key.split(":")[0] url = key[len(label) + 1 :] buttons.append(InlineKeyboardButton(label, url=url)) else: # commands like: 'Label:/cmd' become ('Label', '/cmd') label = key.split(":/")[0] command = key[len(label) + 1 :] buttons.append( InlineKeyboardButton(label, callback_data=command) ) else: # commands like: '/cmd' become ('CMD', '/cmd') label = key.strip()[1:].upper() buttons.append(InlineKeyboardButton(label, callback_data=key)) elif isinstance(row_keyboard, list): for entry in row_keyboard: text_btn, data_btn = entry if data_btn.startswith("https://"): buttons.append(InlineKeyboardButton(text_btn, url=data_btn)) else: buttons.append( InlineKeyboardButton(text_btn, callback_data=data_btn) ) else: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_inline_keyboard", ) return buttons # Defaults params: dict[str, Any] = { ATTR_PARSER: self.parse_mode, ATTR_DISABLE_NOTIF: False, ATTR_DISABLE_WEB_PREV: None, ATTR_REPLY_TO_MSGID: None, ATTR_REPLYMARKUP: None, ATTR_TIMEOUT: None, ATTR_MESSAGE_TAG: None, ATTR_MESSAGE_THREAD_ID: None, } if data is not None: if ATTR_PARSER in data: params[ATTR_PARSER] = data[ATTR_PARSER] if ATTR_TIMEOUT in data: params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT] if ATTR_DISABLE_NOTIF in data: params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF] if ATTR_DISABLE_WEB_PREV in data: params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV] if ATTR_REPLY_TO_MSGID in data: params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID] if ATTR_MESSAGE_TAG in data: params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG] if ATTR_MESSAGE_THREAD_ID in data: params[ATTR_MESSAGE_THREAD_ID] = data[ATTR_MESSAGE_THREAD_ID] # Keyboards: if ATTR_KEYBOARD in data: keys = data[ATTR_KEYBOARD] keys = keys if isinstance(keys, list) else [keys] if keys: params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup( [[key.strip() for key in row.split(",")] for row in keys], resize_keyboard=data.get(ATTR_RESIZE_KEYBOARD, False), one_time_keyboard=data.get(ATTR_ONE_TIME_KEYBOARD, False), ) else: params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True) elif ATTR_KEYBOARD_INLINE in data: keys = data.get(ATTR_KEYBOARD_INLINE) keys = keys if isinstance(keys, list) else [keys] params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup( [_make_row_inline_keyboard(row) for row in keys] ) if params[ATTR_PARSER] == PARSER_PLAIN_TEXT: params[ATTR_PARSER] = None return params async def _send_msg_formatted( self, func_send: Callable[..., Awaitable[Message | tuple[Message, ...]]], message_tag: str | None, *args_msg: Any, context: Context | None = None, **kwargs_msg: Any, ) -> dict[str, JsonValueType]: """Sends a message and formats the response. :return: dict with chat_id keys and message_id values for successful sends """ chat_id: int = kwargs_msg.pop(ATTR_CHAT_ID) _LOGGER.debug("%s to chat ID %s", func_send.__name__, chat_id) response: Message | tuple[Message, ...] = await self._send_msg( func_send, message_tag, chat_id, *args_msg, context=context, **kwargs_msg, ) if isinstance(response, Iterable): return {str(chat_id): [message.id for message in response]} return {str(chat_id): response.id} async def _send_msg( self, func_send: Callable[..., Awaitable[Any]], message_tag: str | None, *args_msg: Any, context: Context | None = None, **kwargs_msg: Any, ) -> Any: """Send one message.""" out = await func_send(*args_msg, **kwargs_msg) message = out if isinstance(message, Iterable): message = out[-1] if isinstance(message, Message): chat_id = message.chat_id message_id = message.message_id self._last_message_id[chat_id] = message_id _LOGGER.debug( "Last message ID: %s (from chat_id %s)", self._last_message_id, chat_id, ) event_data: dict[str, Any] = { ATTR_CHAT_ID: chat_id, ATTR_MESSAGE_ID: message_id, } if message_tag is not None: event_data[ATTR_MESSAGE_TAG] = message_tag if kwargs_msg.get(ATTR_MESSAGE_THREAD_ID) is not None: event_data[ATTR_MESSAGE_THREAD_ID] = kwargs_msg[ATTR_MESSAGE_THREAD_ID] event_data["bot"] = _get_bot_info(self.bot, self.config) self.hass.bus.async_fire(EVENT_TELEGRAM_SENT, event_data, context=context) async_dispatcher_send( self.hass, signal(self.bot), EVENT_TELEGRAM_SENT, event_data ) return out async def send_message( self, message: str, chat_id: int, context: Context | None = None, **kwargs: dict[str, Any], ) -> dict[str, JsonValueType]: """Send a message to one or multiple pre-allowed chat IDs.""" title = kwargs.get(ATTR_TITLE) text = f"{title}\n{message}" if title else message params = self._get_msg_kwargs(kwargs) return await self._send_msg_formatted( self.bot.send_message, params[ATTR_MESSAGE_TAG], text, chat_id=chat_id, parse_mode=params[ATTR_PARSER], disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) async def send_media_group( self, chat_id: int, context: Context | None = None, **kwargs: Any, ) -> dict[str, JsonValueType]: """Send media group to a chat ID. Returns a dict mapping each chat_id to message_ids. """ params = self._get_msg_kwargs(kwargs) media: list[ InputMediaAudio | InputMediaDocument | InputMediaPhoto | InputMediaVideo ] = [] input_media: list[dict[str, Any]] = kwargs[ATTR_MEDIA] for entry in input_media: file_content = await load_data( self.hass, url=entry.get(ATTR_URL), filepath=entry.get(ATTR_FILE), username=entry.get(ATTR_USERNAME, ""), password=entry.get(ATTR_PASSWORD, ""), authentication=entry.get(ATTR_AUTHENTICATION), verify_ssl=entry[ATTR_VERIFY_SSL], ) _LOGGER.debug("downloaded: %s", entry.get(ATTR_URL) or entry.get(ATTR_FILE)) caption: str | None = entry.get(ATTR_CAPTION) if entry[ATTR_MEDIA_TYPE] == InputMediaType.AUDIO: media.append(InputMediaAudio(file_content, caption=caption)) elif entry[ATTR_MEDIA_TYPE] == InputMediaType.DOCUMENT: media.append(InputMediaDocument(file_content, caption=caption)) elif entry[ATTR_MEDIA_TYPE] == InputMediaType.PHOTO: media.append(InputMediaPhoto(file_content, caption=caption)) else: media.append(InputMediaVideo(file_content, caption=caption)) return await self._send_msg_formatted( self.bot.send_media_group, params[ATTR_MESSAGE_TAG], chat_id=chat_id, media=media, disable_notification=params[ATTR_DISABLE_NOTIF], protect_content=kwargs.get(ATTR_PROTECT_CONTENT, False), message_thread_id=params[ATTR_MESSAGE_THREAD_ID], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], parse_mode=params[ATTR_PARSER], context=context, ) async def delete_message( self, chat_id: int, context: Context | None = None, **kwargs: dict[str, Any], ) -> bool: """Delete a previously sent message.""" message_id, _ = self._get_msg_ids(kwargs, chat_id) _LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id) deleted: bool = await self._send_msg( self.bot.delete_message, None, chat_id, message_id, context=context, ) # reduce message_id anyway: if chat_id in self._last_message_id: # change last msg_id for deque(n_msgs)? self._last_message_id[chat_id] -= 1 return deleted async def edit_message_media( self, media_type: str, chat_id: int, context: Context | None = None, **kwargs: Any, ) -> Any: "Edit message media of a previously sent message." message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id) params = self._get_msg_kwargs(kwargs) _LOGGER.debug( "Edit message media %s in chat ID %s with params: %s", message_id or inline_message_id, chat_id, params, ) file_content = await load_data( self.hass, url=kwargs.get(ATTR_URL), filepath=kwargs.get(ATTR_FILE), username=kwargs.get(ATTR_USERNAME, ""), password=kwargs.get(ATTR_PASSWORD, ""), authentication=kwargs.get(ATTR_AUTHENTICATION), verify_ssl=kwargs.get(ATTR_VERIFY_SSL, False), ) media: InputMedia if media_type == InputMediaType.ANIMATION: media = InputMediaAnimation( file_content, caption=kwargs.get(ATTR_CAPTION), parse_mode=params[ATTR_PARSER], ) elif media_type == InputMediaType.AUDIO: media = InputMediaAudio( file_content, caption=kwargs.get(ATTR_CAPTION), parse_mode=params[ATTR_PARSER], ) elif media_type == InputMediaType.DOCUMENT: media = InputMediaDocument( file_content, caption=kwargs.get(ATTR_CAPTION), parse_mode=params[ATTR_PARSER], ) elif media_type == InputMediaType.PHOTO: media = InputMediaPhoto( file_content, caption=kwargs.get(ATTR_CAPTION), parse_mode=params[ATTR_PARSER], ) else: media = InputMediaVideo( file_content, caption=kwargs.get(ATTR_CAPTION), parse_mode=params[ATTR_PARSER], ) return await self._send_msg( self.bot.edit_message_media, params[ATTR_MESSAGE_TAG], media=media, chat_id=chat_id, message_id=message_id, inline_message_id=inline_message_id, reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], context=context, ) async def edit_message( self, type_edit: str, chat_id: int, context: Context | None = None, **kwargs: dict[str, Any], ) -> Any: """Edit a previously sent message.""" message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id) params = self._get_msg_kwargs(kwargs) _LOGGER.debug( "Edit message %s in chat ID %s with params: %s", message_id or inline_message_id, chat_id, params, ) if type_edit == SERVICE_EDIT_MESSAGE: message = kwargs.get(ATTR_MESSAGE) title = kwargs.get(ATTR_TITLE) text = f"{title}\n{message}" if title else message _LOGGER.debug("Editing message with ID %s", message_id or inline_message_id) return await self._send_msg( self.bot.edit_message_text, params[ATTR_MESSAGE_TAG], text, chat_id=chat_id, message_id=message_id, inline_message_id=inline_message_id, parse_mode=params[ATTR_PARSER], disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], context=context, ) if type_edit == SERVICE_EDIT_CAPTION: return await self._send_msg( self.bot.edit_message_caption, params[ATTR_MESSAGE_TAG], chat_id=chat_id, message_id=message_id, inline_message_id=inline_message_id, caption=kwargs.get(ATTR_CAPTION), reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], parse_mode=params[ATTR_PARSER], context=context, ) return await self._send_msg( self.bot.edit_message_reply_markup, params[ATTR_MESSAGE_TAG], chat_id=chat_id, message_id=message_id, inline_message_id=inline_message_id, reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], context=context, ) async def answer_callback_query( self, message: str | None, callback_query_id: str, show_alert: bool = False, context: Context | None = None, **kwargs: dict[str, Any], ) -> None: """Answer a callback originated with a press in an inline keyboard.""" params = self._get_msg_kwargs(kwargs) _LOGGER.debug( "Answer callback query with callback ID %s: %s, alert: %s", callback_query_id, message, show_alert, ) await self._send_msg( self.bot.answer_callback_query, params[ATTR_MESSAGE_TAG], callback_query_id, text=message, show_alert=show_alert, read_timeout=params[ATTR_TIMEOUT], context=context, ) async def send_chat_action( self, chat_id: int, chat_action: str = "", context: Context | None = None, **kwargs: Any, ) -> dict[str, JsonValueType]: """Send a chat action to pre-allowed chat IDs.""" result: dict[str, JsonValueType] = {} _LOGGER.debug("Send action %s in chat ID %s", chat_action, chat_id) is_successful = await self._send_msg( self.bot.send_chat_action, None, chat_id=chat_id, action=chat_action, message_thread_id=kwargs.get(ATTR_MESSAGE_THREAD_ID), context=context, ) result[str(chat_id)] = is_successful return result async def send_file( self, file_type: str, context: Context | None = None, **kwargs: Any, ) -> dict[str, JsonValueType]: """Send a photo, sticker, video, or document.""" params = self._get_msg_kwargs(kwargs) file_content = await load_data( self.hass, url=kwargs.get(ATTR_URL), filepath=kwargs.get(ATTR_FILE), username=kwargs.get(ATTR_USERNAME, ""), password=kwargs.get(ATTR_PASSWORD, ""), authentication=kwargs.get(ATTR_AUTHENTICATION), verify_ssl=kwargs.get(ATTR_VERIFY_SSL, False), ) if file_type == SERVICE_SEND_PHOTO: return await self._send_msg_formatted( self.bot.send_photo, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], photo=file_content, caption=kwargs.get(ATTR_CAPTION), disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], parse_mode=params[ATTR_PARSER], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) if file_type == SERVICE_SEND_STICKER: return await self._send_msg_formatted( self.bot.send_sticker, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], sticker=file_content, disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) if file_type == SERVICE_SEND_VIDEO: return await self._send_msg_formatted( self.bot.send_video, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], video=file_content, caption=kwargs.get(ATTR_CAPTION), disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], parse_mode=params[ATTR_PARSER], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) if file_type == SERVICE_SEND_DOCUMENT: return await self._send_msg_formatted( self.bot.send_document, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], document=file_content, caption=kwargs.get(ATTR_CAPTION), disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], parse_mode=params[ATTR_PARSER], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) if file_type == SERVICE_SEND_VOICE: return await self._send_msg_formatted( self.bot.send_voice, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], voice=file_content, caption=kwargs.get(ATTR_CAPTION), disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) # SERVICE_SEND_ANIMATION return await self._send_msg_formatted( self.bot.send_animation, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], animation=file_content, caption=kwargs.get(ATTR_CAPTION), disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], parse_mode=params[ATTR_PARSER], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) async def send_sticker( self, context: Context | None = None, **kwargs: Any, ) -> dict[str, JsonValueType]: """Send a sticker from a telegram sticker pack.""" params = self._get_msg_kwargs(kwargs) stickerid = kwargs.get(ATTR_STICKER_ID) if stickerid: return await self._send_msg_formatted( self.bot.send_sticker, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], sticker=stickerid, disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], reply_markup=params[ATTR_REPLYMARKUP], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) return await self.send_file(SERVICE_SEND_STICKER, context, **kwargs) async def send_location( self, latitude: Any, longitude: Any, context: Context | None = None, **kwargs: dict[str, Any], ) -> dict[str, JsonValueType]: """Send a location.""" latitude = float(latitude) longitude = float(longitude) params = self._get_msg_kwargs(kwargs) return await self._send_msg_formatted( self.bot.send_location, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], latitude=latitude, longitude=longitude, disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) async def send_poll( self, question: str, options: Sequence[str | InputPollOption], is_anonymous: bool | None, allows_multiple_answers: bool | None, context: Context | None = None, **kwargs: dict[str, Any], ) -> dict[str, JsonValueType]: """Send a poll.""" params = self._get_msg_kwargs(kwargs) openperiod = kwargs.get(ATTR_OPEN_PERIOD) return await self._send_msg_formatted( self.bot.send_poll, params[ATTR_MESSAGE_TAG], chat_id=kwargs[ATTR_CHAT_ID], question=question, options=options, is_anonymous=is_anonymous, allows_multiple_answers=allows_multiple_answers, open_period=openperiod, disable_notification=params[ATTR_DISABLE_NOTIF], reply_to_message_id=params[ATTR_REPLY_TO_MSGID], read_timeout=params[ATTR_TIMEOUT], message_thread_id=params[ATTR_MESSAGE_THREAD_ID], context=context, ) async def leave_chat( self, chat_id: int, context: Context | None = None, **kwargs: dict[str, Any], ) -> Any: """Remove bot from chat.""" _LOGGER.debug("Leave from chat ID %s", chat_id) return await self._send_msg(self.bot.leave_chat, None, chat_id, context=context) async def set_message_reaction( self, reaction: str, chat_id: int, is_big: bool = False, context: Context | None = None, **kwargs: dict[str, Any], ) -> None: """Set the bot's reaction for a given message.""" message_id, _ = self._get_msg_ids(kwargs, chat_id) params = self._get_msg_kwargs(kwargs) _LOGGER.debug( "Set reaction to message %s in chat ID %s to %s with params: %s", message_id, chat_id, reaction, params, ) await self._send_msg( self.bot.set_message_reaction, params[ATTR_MESSAGE_TAG], chat_id, message_id, reaction=reaction, is_big=is_big, read_timeout=params[ATTR_TIMEOUT], context=context, ) async def send_message_draft( self, message: str, chat_id: int, draft_id: int, context: Context | None = None, **kwargs: dict[str, Any], ) -> None: """Stream a partial message to a user while the message is being generated.""" params = self._get_msg_kwargs(kwargs) _LOGGER.debug( "Sending message draft %s in chat ID %s with params: %s", draft_id, chat_id, params, ) await self._send_msg( self.bot.send_message_draft, None, chat_id=chat_id, draft_id=draft_id, text=message, message_thread_id=params[ATTR_MESSAGE_THREAD_ID], parse_mode=params[ATTR_PARSER], read_timeout=params[ATTR_TIMEOUT], context=context, ) async def download_file( self, file_id: str, directory_path: str | None = None, file_name: str | None = None, context: Context | None = None, **kwargs: dict[str, Any], ) -> dict[str, JsonValueType]: """Download a file from Telegram.""" if directory_path: try: raise_if_invalid_path(directory_path) except ValueError as err: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_directory_path", translation_placeholders={"directory_path": directory_path}, ) from err else: directory_path = self.hass.config.path(DOMAIN) if file_name: try: raise_if_invalid_filename(file_name) except ValueError as err: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_file_name", translation_placeholders={"file_name": file_name}, ) from err file: File = await self._send_msg( self.bot.get_file, None, file_id=file_id, context=context, ) if not file.file_path: raise HomeAssistantError( translation_domain=DOMAIN, translation_key="action_failed", translation_placeholders={ "error": "No file path returned from Telegram" }, ) if not file_name: file_name = os.path.basename(file.file_path) custom_path = os.path.join(directory_path, file_name) await self.hass.async_add_executor_job( self._prepare_download_directory, directory_path ) _LOGGER.debug("Download file %s to %s", file_id, custom_path) try: file_content = await file.download_as_bytearray() await self.hass.async_add_executor_job( Path(custom_path).write_bytes, file_content ) except (RuntimeError, OSError, TelegramError) as exc: raise HomeAssistantError( translation_domain=DOMAIN, translation_key="action_failed", translation_placeholders={"error": str(exc)}, ) from exc return {ATTR_FILE_PATH: custom_path} @staticmethod def _prepare_download_directory(directory_path: str) -> None: """Create download directory if it does not exist.""" if not os.path.exists(directory_path): _LOGGER.debug("directory %s does not exist, creating it", directory_path) os.makedirs(directory_path, exist_ok=True) def initialize_bot(hass: HomeAssistant, p_config: MappingProxyType[str, Any]) -> Bot: """Initialize telegram bot with proxy support.""" api_key: str = p_config[CONF_API_KEY] # set up timeouts to handle large file downloads and uploads # server-side file size limit is 2GB read_timeout = DEFAULT_TIMEOUT_SECONDS media_write_timeout = DEFAULT_TIMEOUT_SECONDS proxy_url: str | None = p_config.get(CONF_PROXY_URL) if proxy_url is not None: proxy = httpx.Proxy(proxy_url) request = HTTPXRequest( connection_pool_size=8, proxy=proxy, read_timeout=read_timeout, media_write_timeout=media_write_timeout, ) get_updates_request = HTTPXRequest(proxy=proxy) else: request = HTTPXRequest( connection_pool_size=8, read_timeout=read_timeout, media_write_timeout=media_write_timeout, ) get_updates_request = None base_url: str = p_config[CONF_API_ENDPOINT] return Bot( token=api_key, base_url=f"{base_url}/bot", base_file_url=f"{base_url}/file/bot", request=request, get_updates_request=get_updates_request, ) async def load_data( hass: HomeAssistant, url: str | None, filepath: str | None, username: str, password: str, authentication: str | None, verify_ssl: bool, num_retries: int = 5, ) -> io.BytesIO: """Load data into ByteIO/File container from a source.""" if url is not None: # Load data from URL params: dict[str, Any] = {} headers: dict[str, str] = {} _validate_credentials_input(authentication, username, password) if authentication == HTTP_BEARER_AUTHENTICATION: headers = {"Authorization": f"Bearer {password}"} elif authentication == HTTP_DIGEST_AUTHENTICATION: params["auth"] = httpx.DigestAuth(username, password) elif authentication == HTTP_BASIC_AUTHENTICATION: params["auth"] = httpx.BasicAuth(username, password) retry_num = 0 async with get_async_client(hass, verify_ssl) as client: while retry_num < num_retries: try: response = await client.get( url, headers=headers, timeout=DEFAULT_TIMEOUT_SECONDS, **params ) except (httpx.HTTPError, httpx.InvalidURL) as err: raise HomeAssistantError( translation_domain=DOMAIN, translation_key="failed_to_load_url", translation_placeholders={"error": str(err)}, ) from err if response.status_code != 200: _LOGGER.warning( "Status code %s (retry #%s) loading %s", response.status_code, retry_num + 1, url, ) else: data = io.BytesIO(response.content) if data.read(): data.seek(0) data.name = url _LOGGER.debug("file downloaded: %s", url) return data _LOGGER.warning("Empty data (retry #%s) in %s)", retry_num + 1, url) retry_num += 1 if retry_num < num_retries: await asyncio.sleep( 1 ) # Add a sleep to allow other async operations to proceed raise HomeAssistantError( translation_domain=DOMAIN, translation_key="failed_to_load_url", translation_placeholders={"error": str(response.status_code)}, ) elif filepath is not None: if hass.config.is_allowed_path(filepath): return await hass.async_add_executor_job(_read_file_as_bytesio, filepath) raise ServiceValidationError( translation_domain=DOMAIN, translation_key="allowlist_external_dirs_error", ) else: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="missing_input", translation_placeholders={"field": "URL or File"}, ) def _validate_credentials_input( authentication: str | None, username: str | None, password: str | None ) -> None: if ( authentication in (HTTP_BASIC_AUTHENTICATION, HTTP_DIGEST_AUTHENTICATION) and not username ): raise ServiceValidationError( translation_domain=DOMAIN, translation_key="missing_input", translation_placeholders={"field": "Username"}, ) if ( authentication in ( HTTP_BASIC_AUTHENTICATION, HTTP_BEARER_AUTHENTICATION, HTTP_BEARER_AUTHENTICATION, ) and not password ): raise ServiceValidationError( translation_domain=DOMAIN, translation_key="missing_input", translation_placeholders={"field": "Password"}, ) def _read_file_as_bytesio(file_path: str) -> io.BytesIO: """Read a file and return it as a BytesIO object.""" try: with open(file_path, "rb") as file: data = io.BytesIO(file.read()) data.name = file_path return data except OSError as err: raise HomeAssistantError( translation_domain=DOMAIN, translation_key="failed_to_load_file", translation_placeholders={"error": str(err)}, ) from err