1
0
mirror of https://github.com/home-assistant/core.git synced 2026-04-17 07:34:07 +01:00

Suggest chat_id for subentry flow for Telegram bot (#165515)

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
hanwg
2026-03-17 00:33:00 +08:00
committed by GitHub
parent 6945418805
commit 7d2a305996
5 changed files with 203 additions and 4 deletions

View File

@@ -141,6 +141,7 @@ class BaseTelegramBot:
"""Initialize the bot base class."""
self.hass = hass
self.config = config
self.most_recent_chat_id: int | None = None
self._bot = bot
@abstractmethod
@@ -150,8 +151,6 @@ class BaseTelegramBot:
async def handle_update(self, update: Update, context: CallbackContext) -> bool:
"""Handle updates from bot application set up by the respective platform."""
_LOGGER.debug("Handling update %s", update)
if not self.authorize_update(update):
return False
# establish event type: text, command or callback_query
if update.callback_query:
@@ -168,6 +167,11 @@ class BaseTelegramBot:
_LOGGER.warning("Unhandled update: %s", update)
return True
self.most_recent_chat_id = event_data[ATTR_CHAT_ID]
if not self.authorize_update(update):
return False
event_data["bot"] = _get_bot_info(self._bot, self.config)
event_context = Context()

View File

@@ -619,14 +619,66 @@ class AllowedChatIdsSubEntryFlowHandler(ConfigSubentryFlow):
description_placeholders["bot_username"] = f"@{service.bot.username}"
description_placeholders["bot_url"] = f"https://t.me/{service.bot.username}"
# suggest chat id based on the most recent chat
suggested_values = {}
description_placeholders["most_recent_chat"] = "Not available"
try:
most_recent_chat = await _get_most_recent_chat(service)
except TelegramError as err:
_LOGGER.warning("Error occurred while fetching recent chat: %s", err)
most_recent_chat = None
if most_recent_chat is not None:
suggested_values[CONF_CHAT_ID] = most_recent_chat[0]
description_placeholders["most_recent_chat"] = (
f"{most_recent_chat[1]} ({most_recent_chat[0]})"
if most_recent_chat[1]
else str(most_recent_chat[0])
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}),
data_schema=self.add_suggested_values_to_schema(
vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}),
suggested_values,
),
description_placeholders=description_placeholders,
errors=errors,
)
async def _get_most_recent_chat(
service: TelegramNotificationService,
) -> tuple[int, str | None] | None:
"""Get the most recent chat ID and name.
For broadcast bot, this is retrieved using get_updates() to find the most recent message received.
For polling or webhook bot, this is retrieved from the runtime data which is updated whenever a message is received.
"""
if service.app is not None:
# this is either polling or webhook bot
if service.app.most_recent_chat_id is None:
return None
chat = await service.bot.get_chat(service.app.most_recent_chat_id)
return (service.app.most_recent_chat_id, chat.effective_name)
# broadcast bot
updates = await service.bot.get_updates(offset=0)
if updates:
last_update = updates[-1]
if last_update.effective_chat:
chat_name = last_update.effective_chat.effective_name
return (
last_update.effective_chat.id,
chat_name,
)
return None
async def _async_get_chat_name(bot: Bot, chat_id: int) -> str:
try:
chat_info: ChatFullInfo = await bot.get_chat(chat_id)

View File

@@ -105,7 +105,7 @@
"data_description": {
"chat_id": "ID representing the user or group chat to which messages can be sent."
},
"description": "Before you proceed, send any message to your bot: [{bot_username}]({bot_url}). This is required because Telegram prevents bots from initiating chats with users.\n\nThen follow these steps to get your chat ID:\n\n1. Open Telegram and start a chat with [{id_bot_username}]({id_bot_url}).\n1. Send any message to the bot.\n1. Your chat ID is in the `ID` field of the bot's response.",
"description": "Before you proceed, send any message to your bot: [{bot_username}]({bot_url}). This is required because Telegram prevents bots from initiating chats with users.\n\nThen follow these steps to get your chat ID:\n\n1. Open Telegram and start a chat with [{id_bot_username}]({id_bot_url}).\n1. Send any message to the bot.\n1. Your chat ID is in the `ID` field of the bot's response.\n\nMost recent chat: {most_recent_chat}",
"title": "Add chat"
}
}

View File

@@ -12,6 +12,7 @@ from telegram import (
Chat,
ChatFullInfo,
Message,
Update,
User,
WebhookInfo,
)
@@ -137,6 +138,24 @@ def mock_external_calls() -> Generator[None]:
patch.object(BotMock, "send_animation", return_value=message),
patch.object(BotMock, "send_location", return_value=message),
patch.object(BotMock, "send_poll", return_value=message),
patch.object(
BotMock,
"get_updates",
return_value=(
Update(
1,
Message(
1,
datetime.now(),
Chat(
id=123456,
type=ChatType.PRIVATE,
first_name="mock first_name",
),
),
),
),
),
patch.object(BotMock, "log_out", return_value=True),
patch("telegram.ext.Updater._bootstrap"),
):

View File

@@ -23,12 +23,14 @@ from homeassistant.components.telegram_bot.const import (
SECTION_ADVANCED_SETTINGS,
SUBENTRY_TYPE_ALLOWED_CHAT_IDS,
)
from homeassistant.components.telegram_bot.webhooks import TELEGRAM_WEBHOOK_URL
from homeassistant.config_entries import SOURCE_USER, ConfigSubentry
from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry, pytest
from tests.typing import ClientSessionGenerator
@pytest.fixture
@@ -541,6 +543,7 @@ async def test_subentry_flow(
**DESCRIPTION_PLACEHOLDERS,
"bot_username": "@mock_bot",
"bot_url": "https://t.me/mock_bot",
"most_recent_chat": "mock first_name (123456)",
}
result = await hass.config_entries.subentries.async_configure(
@@ -620,6 +623,127 @@ async def test_subentry_flow_chat_error(
assert result["reason"] == "already_configured"
async def test_subentry_flow_webhook_with_update(
hass: HomeAssistant,
webhook_bot,
hass_client: ClientSessionGenerator,
update_message_text,
mock_generate_secret_token,
) -> None:
"""Test subentry flow with webhook bot."""
# send a message to the webhook to create a recent chat
client = await hass_client()
response = await client.post(
f"{TELEGRAM_WEBHOOK_URL}_123456",
json=update_message_text,
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
)
assert response.status == 200
# start the subentry flow
config_entry = hass.config_entries.async_entries(DOMAIN)[0]
result = await hass.config_entries.subentries.async_init(
(config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["description_placeholders"] == {
**DESCRIPTION_PLACEHOLDERS,
"bot_username": "@mock_bot",
"bot_url": "https://t.me/mock_bot",
"most_recent_chat": "mock title (1111111)",
}
async def test_subentry_flow_polling_bot_without_update(
hass: HomeAssistant,
mock_polling_config_entry: MockConfigEntry,
mock_external_calls: None,
mock_polling_calls: None,
) -> None:
"""Test subentry flow with polling bot."""
mock_polling_config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(mock_polling_config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.subentries.async_init(
(mock_polling_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["description_placeholders"] == {
**DESCRIPTION_PLACEHOLDERS,
"bot_username": "@mock_bot",
"bot_url": "https://t.me/mock_bot",
"most_recent_chat": "Not available",
}
async def test_subentry_flow_broadcast_without_update(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test subentry flow where broadcast bot did not receive any messages."""
mock_broadcast_config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.bot.Bot.get_updates", return_value=()
):
result = await hass.config_entries.subentries.async_init(
(mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["description_placeholders"] == {
**DESCRIPTION_PLACEHOLDERS,
"bot_username": "@mock_bot",
"bot_url": "https://t.me/mock_bot",
"most_recent_chat": "Not available",
}
async def test_subentry_flow_broadcast_update_error(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test subentry flow where broadcast bot encounter error while receiving messages."""
mock_broadcast_config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.bot.Bot.get_updates",
side_effect=NetworkError("mock network error"),
):
result = await hass.config_entries.subentries.async_init(
(mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["description_placeholders"] == {
**DESCRIPTION_PLACEHOLDERS,
"bot_username": "@mock_bot",
"bot_url": "https://t.me/mock_bot",
"most_recent_chat": "Not available",
}
async def test_duplicate_entry(hass: HomeAssistant) -> None:
"""Test user flow with duplicated entries."""