From 7d2a305996b105bcefe4bc5cd8727487c8ac8ffe Mon Sep 17 00:00:00 2001 From: hanwg Date: Tue, 17 Mar 2026 00:33:00 +0800 Subject: [PATCH] Suggest chat_id for subentry flow for Telegram bot (#165515) Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- homeassistant/components/telegram_bot/bot.py | 8 +- .../components/telegram_bot/config_flow.py | 54 +++++++- .../components/telegram_bot/strings.json | 2 +- tests/components/telegram_bot/conftest.py | 19 +++ .../telegram_bot/test_config_flow.py | 124 ++++++++++++++++++ 5 files changed, 203 insertions(+), 4 deletions(-) diff --git a/homeassistant/components/telegram_bot/bot.py b/homeassistant/components/telegram_bot/bot.py index ff4e487ba1b..eb27d0138ca 100644 --- a/homeassistant/components/telegram_bot/bot.py +++ b/homeassistant/components/telegram_bot/bot.py @@ -141,6 +141,7 @@ class BaseTelegramBot: """Initialize the bot base class.""" self.hass = hass self.config = config + self.most_recent_chat_id: int | None = None self._bot = bot @abstractmethod @@ -150,8 +151,6 @@ class BaseTelegramBot: async def handle_update(self, update: Update, context: CallbackContext) -> bool: """Handle updates from bot application set up by the respective platform.""" _LOGGER.debug("Handling update %s", update) - if not self.authorize_update(update): - return False # establish event type: text, command or callback_query if update.callback_query: @@ -168,6 +167,11 @@ class BaseTelegramBot: _LOGGER.warning("Unhandled update: %s", update) return True + self.most_recent_chat_id = event_data[ATTR_CHAT_ID] + + if not self.authorize_update(update): + return False + event_data["bot"] = _get_bot_info(self._bot, self.config) event_context = Context() diff --git a/homeassistant/components/telegram_bot/config_flow.py b/homeassistant/components/telegram_bot/config_flow.py index e5147b76f8a..09f67904cb4 100644 --- a/homeassistant/components/telegram_bot/config_flow.py +++ b/homeassistant/components/telegram_bot/config_flow.py @@ -619,14 +619,66 @@ class AllowedChatIdsSubEntryFlowHandler(ConfigSubentryFlow): description_placeholders["bot_username"] = f"@{service.bot.username}" description_placeholders["bot_url"] = f"https://t.me/{service.bot.username}" + # suggest chat id based on the most recent chat + suggested_values = {} + description_placeholders["most_recent_chat"] = "Not available" + try: + most_recent_chat = await _get_most_recent_chat(service) + except TelegramError as err: + _LOGGER.warning("Error occurred while fetching recent chat: %s", err) + most_recent_chat = None + if most_recent_chat is not None: + suggested_values[CONF_CHAT_ID] = most_recent_chat[0] + + description_placeholders["most_recent_chat"] = ( + f"{most_recent_chat[1]} ({most_recent_chat[0]})" + if most_recent_chat[1] + else str(most_recent_chat[0]) + ) + return self.async_show_form( step_id="user", - data_schema=vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}), + data_schema=self.add_suggested_values_to_schema( + vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}), + suggested_values, + ), description_placeholders=description_placeholders, errors=errors, ) +async def _get_most_recent_chat( + service: TelegramNotificationService, +) -> tuple[int, str | None] | None: + """Get the most recent chat ID and name. + + For broadcast bot, this is retrieved using get_updates() to find the most recent message received. + For polling or webhook bot, this is retrieved from the runtime data which is updated whenever a message is received. + """ + + if service.app is not None: + # this is either polling or webhook bot + + if service.app.most_recent_chat_id is None: + return None + + chat = await service.bot.get_chat(service.app.most_recent_chat_id) + return (service.app.most_recent_chat_id, chat.effective_name) + + # broadcast bot + updates = await service.bot.get_updates(offset=0) + if updates: + last_update = updates[-1] + if last_update.effective_chat: + chat_name = last_update.effective_chat.effective_name + return ( + last_update.effective_chat.id, + chat_name, + ) + + return None + + async def _async_get_chat_name(bot: Bot, chat_id: int) -> str: try: chat_info: ChatFullInfo = await bot.get_chat(chat_id) diff --git a/homeassistant/components/telegram_bot/strings.json b/homeassistant/components/telegram_bot/strings.json index 3412a65709e..840b926bd5a 100644 --- a/homeassistant/components/telegram_bot/strings.json +++ b/homeassistant/components/telegram_bot/strings.json @@ -105,7 +105,7 @@ "data_description": { "chat_id": "ID representing the user or group chat to which messages can be sent." }, - "description": "Before you proceed, send any message to your bot: [{bot_username}]({bot_url}). This is required because Telegram prevents bots from initiating chats with users.\n\nThen follow these steps to get your chat ID:\n\n1. Open Telegram and start a chat with [{id_bot_username}]({id_bot_url}).\n1. Send any message to the bot.\n1. Your chat ID is in the `ID` field of the bot's response.", + "description": "Before you proceed, send any message to your bot: [{bot_username}]({bot_url}). This is required because Telegram prevents bots from initiating chats with users.\n\nThen follow these steps to get your chat ID:\n\n1. Open Telegram and start a chat with [{id_bot_username}]({id_bot_url}).\n1. Send any message to the bot.\n1. Your chat ID is in the `ID` field of the bot's response.\n\nMost recent chat: {most_recent_chat}", "title": "Add chat" } } diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index cc6cb02ffc6..b625e3555da 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -12,6 +12,7 @@ from telegram import ( Chat, ChatFullInfo, Message, + Update, User, WebhookInfo, ) @@ -137,6 +138,24 @@ def mock_external_calls() -> Generator[None]: patch.object(BotMock, "send_animation", return_value=message), patch.object(BotMock, "send_location", return_value=message), patch.object(BotMock, "send_poll", return_value=message), + patch.object( + BotMock, + "get_updates", + return_value=( + Update( + 1, + Message( + 1, + datetime.now(), + Chat( + id=123456, + type=ChatType.PRIVATE, + first_name="mock first_name", + ), + ), + ), + ), + ), patch.object(BotMock, "log_out", return_value=True), patch("telegram.ext.Updater._bootstrap"), ): diff --git a/tests/components/telegram_bot/test_config_flow.py b/tests/components/telegram_bot/test_config_flow.py index 00e5875d2af..72043e0c85f 100644 --- a/tests/components/telegram_bot/test_config_flow.py +++ b/tests/components/telegram_bot/test_config_flow.py @@ -23,12 +23,14 @@ from homeassistant.components.telegram_bot.const import ( SECTION_ADVANCED_SETTINGS, SUBENTRY_TYPE_ALLOWED_CHAT_IDS, ) +from homeassistant.components.telegram_bot.webhooks import TELEGRAM_WEBHOOK_URL from homeassistant.config_entries import SOURCE_USER, ConfigSubentry from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResultType from tests.common import MockConfigEntry, pytest +from tests.typing import ClientSessionGenerator @pytest.fixture @@ -541,6 +543,7 @@ async def test_subentry_flow( **DESCRIPTION_PLACEHOLDERS, "bot_username": "@mock_bot", "bot_url": "https://t.me/mock_bot", + "most_recent_chat": "mock first_name (123456)", } result = await hass.config_entries.subentries.async_configure( @@ -620,6 +623,127 @@ async def test_subentry_flow_chat_error( assert result["reason"] == "already_configured" +async def test_subentry_flow_webhook_with_update( + hass: HomeAssistant, + webhook_bot, + hass_client: ClientSessionGenerator, + update_message_text, + mock_generate_secret_token, +) -> None: + """Test subentry flow with webhook bot.""" + + # send a message to the webhook to create a recent chat + client = await hass_client() + response = await client.post( + f"{TELEGRAM_WEBHOOK_URL}_123456", + json=update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) + assert response.status == 200 + + # start the subentry flow + config_entry = hass.config_entries.async_entries(DOMAIN)[0] + result = await hass.config_entries.subentries.async_init( + (config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["description_placeholders"] == { + **DESCRIPTION_PLACEHOLDERS, + "bot_username": "@mock_bot", + "bot_url": "https://t.me/mock_bot", + "most_recent_chat": "mock title (1111111)", + } + + +async def test_subentry_flow_polling_bot_without_update( + hass: HomeAssistant, + mock_polling_config_entry: MockConfigEntry, + mock_external_calls: None, + mock_polling_calls: None, +) -> None: + """Test subentry flow with polling bot.""" + + mock_polling_config_entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(mock_polling_config_entry.entry_id) + await hass.async_block_till_done() + + result = await hass.config_entries.subentries.async_init( + (mock_polling_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["description_placeholders"] == { + **DESCRIPTION_PLACEHOLDERS, + "bot_username": "@mock_bot", + "bot_url": "https://t.me/mock_bot", + "most_recent_chat": "Not available", + } + + +async def test_subentry_flow_broadcast_without_update( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test subentry flow where broadcast bot did not receive any messages.""" + + mock_broadcast_config_entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.telegram_bot.bot.Bot.get_updates", return_value=() + ): + result = await hass.config_entries.subentries.async_init( + (mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["description_placeholders"] == { + **DESCRIPTION_PLACEHOLDERS, + "bot_username": "@mock_bot", + "bot_url": "https://t.me/mock_bot", + "most_recent_chat": "Not available", + } + + +async def test_subentry_flow_broadcast_update_error( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test subentry flow where broadcast bot encounter error while receiving messages.""" + + mock_broadcast_config_entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.telegram_bot.bot.Bot.get_updates", + side_effect=NetworkError("mock network error"), + ): + result = await hass.config_entries.subentries.async_init( + (mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["description_placeholders"] == { + **DESCRIPTION_PLACEHOLDERS, + "bot_username": "@mock_bot", + "bot_url": "https://t.me/mock_bot", + "most_recent_chat": "Not available", + } + + async def test_duplicate_entry(hass: HomeAssistant) -> None: """Test user flow with duplicated entries."""