fix(telegram): escape dynamic markdown in callback flows

Use MarkdownV2 formatting for Telegram callback follow-ups and interactive prompts where dynamic names or user text can break legacy Markdown parsing. Add regression coverage for reload-mcp, model picker, approval callbacks, and update prompts.
This commit is contained in:
Phuong Lambert 2026-05-13 11:51:38 +07:00 committed by Teknium
parent 524490a409
commit a694040520
4 changed files with 174 additions and 46 deletions

View file

@ -2070,7 +2070,7 @@ class TelegramAdapter(BasePlatformAdapter):
return SendResult(success=False, error="Not connected")
try:
default_hint = f" (default: {default})" if default else ""
text = f"⚕ *Update needs your input:*\n\n{prompt}{default_hint}"
text = self.format_message(f"⚕ *Update needs your input:*\n\n{prompt}{default_hint}")
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("✓ Yes", callback_data="update_prompt:y"),
@ -2082,7 +2082,7 @@ class TelegramAdapter(BasePlatformAdapter):
msg = await self._send_message_with_thread_fallback(
chat_id=int(chat_id),
text=text,
parse_mode=ParseMode.MARKDOWN,
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=keyboard,
reply_to_message_id=reply_to_id,
**self._thread_kwargs_for_send(
@ -2334,11 +2334,13 @@ class TelegramAdapter(BasePlatformAdapter):
keyboard = InlineKeyboardMarkup(rows)
provider_label = get_label(current_provider)
text = (
f"⚙ *Model Configuration*\n\n"
f"Current model: `{current_model or 'unknown'}`\n"
f"Provider: {provider_label}\n\n"
f"Select a provider:"
text = self.format_message(
(
f"⚙ *Model Configuration*\n\n"
f"Current model: `{current_model or 'unknown'}`\n"
f"Provider: {provider_label}\n\n"
f"Select a provider:"
)
)
thread_id = metadata.get("thread_id") if metadata else None
@ -2346,7 +2348,7 @@ class TelegramAdapter(BasePlatformAdapter):
msg = await self._send_message_with_thread_fallback(
chat_id=int(chat_id),
text=text,
parse_mode=ParseMode.MARKDOWN,
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=keyboard,
reply_to_message_id=reply_to_id,
**self._thread_kwargs_for_send(
@ -2456,12 +2458,14 @@ class TelegramAdapter(BasePlatformAdapter):
extra = f"\n_{total - shown} more available — type `/model <name>` directly_" if total > shown else ""
await query.edit_message_text(
text=(
f"⚙ *Model Configuration*\n\n"
f"Provider: *{pname}*{page_info}\n"
f"Select a model:{extra}"
text=self.format_message(
(
f"⚙ *Model Configuration*\n\n"
f"Provider: *{pname}*{page_info}\n"
f"Select a model:{extra}"
)
),
parse_mode=ParseMode.MARKDOWN,
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=keyboard,
)
await query.answer()
@ -2490,12 +2494,14 @@ class TelegramAdapter(BasePlatformAdapter):
extra = f"\n_{total - shown} more available — type `/model <name>` directly_" if total > shown else ""
await query.edit_message_text(
text=(
f"⚙ *Model Configuration*\n\n"
f"Provider: *{pname}*{page_info}\n"
f"Select a model:{extra}"
text=self.format_message(
(
f"⚙ *Model Configuration*\n\n"
f"Provider: *{pname}*{page_info}\n"
f"Select a model:{extra}"
)
),
parse_mode=ParseMode.MARKDOWN,
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=keyboard,
)
await query.answer()
@ -2528,22 +2534,22 @@ class TelegramAdapter(BasePlatformAdapter):
result_text = f"Error switching model: {exc}"
# Edit message to show confirmation, remove buttons
try:
await query.edit_message_text(
text=result_text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=None,
)
except Exception:
# Markdown parse failure — retry as plain text
try:
await query.edit_message_text(
text=result_text,
parse_mode=None,
text=self.format_message(result_text),
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=None,
)
except Exception:
pass
# Markdown parse failure — retry as plain text
try:
await query.edit_message_text(
text=result_text,
parse_mode=None,
reply_markup=None,
)
except Exception:
pass
await query.answer(text="Model switched!")
# Clean up state
@ -2571,13 +2577,15 @@ class TelegramAdapter(BasePlatformAdapter):
provider_label = state["current_provider"]
await query.edit_message_text(
text=(
f"⚙ *Model Configuration*\n\n"
f"Current model: `{state['current_model'] or 'unknown'}`\n"
f"Provider: {provider_label}\n\n"
f"Select a provider:"
text=self.format_message(
(
f"⚙ *Model Configuration*\n\n"
f"Current model: `{state['current_model'] or 'unknown'}`\n"
f"Provider: {provider_label}\n\n"
f"Select a provider:"
)
),
parse_mode=ParseMode.MARKDOWN,
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=keyboard,
)
await query.answer()
@ -2660,8 +2668,8 @@ class TelegramAdapter(BasePlatformAdapter):
# Edit message to show decision, remove buttons
try:
await query.edit_message_text(
text=f"{label} by {user_display}",
parse_mode=ParseMode.MARKDOWN,
text=self.format_message(f"{label} by {user_display}"),
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=None,
)
except Exception:
@ -2714,8 +2722,8 @@ class TelegramAdapter(BasePlatformAdapter):
try:
await query.edit_message_text(
text=f"{label} by {user_display}",
parse_mode=ParseMode.MARKDOWN,
text=self.format_message(f"{label} by {user_display}"),
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=None,
)
except Exception:
@ -2740,8 +2748,8 @@ class TelegramAdapter(BasePlatformAdapter):
prompt_message_id = getattr(query.message, "message_id", None)
send_kwargs: Dict[str, Any] = {
"chat_id": int(query.message.chat_id),
"text": result_text,
"parse_mode": ParseMode.MARKDOWN,
"text": self.format_message(result_text),
"parse_mode": ParseMode.MARKDOWN_V2,
**self._link_preview_kwargs(),
}
chat_type_value = getattr(chat_type, "value", chat_type)
@ -2901,8 +2909,8 @@ class TelegramAdapter(BasePlatformAdapter):
label = "Yes" if answer == "y" else "No"
try:
await query.edit_message_text(
text=f"⚕ Update prompt answered: *{label}*",
parse_mode=ParseMode.MARKDOWN,
text=self.format_message(f"⚕ Update prompt answered: *{label}*"),
parse_mode=ParseMode.MARKDOWN_V2,
reply_markup=None,
)
except Exception:

View file

@ -195,6 +195,29 @@ class TestTelegramExecApproval:
or kwargs.get("link_preview_options") is not None
)
@pytest.mark.asyncio
async def test_send_update_prompt_escapes_dynamic_prompt(self):
adapter = _make_adapter()
sent = {}
async def mock_send_message(**kwargs):
sent.update(kwargs)
return SimpleNamespace(message_id=55)
adapter._bot.send_message = AsyncMock(side_effect=mock_send_message)
result = await adapter.send_update_prompt(
chat_id="12345",
prompt="Fix [issue]_1 and verify *markdown*",
default="alpha_beta",
metadata={"thread_id": "999"},
)
assert result.success is True
assert "MARKDOWN_V2" in repr(sent["parse_mode"])
assert "Fix \\[issue\\]\\_1" in sent["text"]
assert "alpha\\_beta" in sent["text"]
@pytest.mark.asyncio
async def test_truncates_long_command(self):
adapter = _make_adapter()
@ -210,9 +233,6 @@ class TestTelegramExecApproval:
kwargs = adapter._bot.send_message.call_args[1]
assert "..." in kwargs["text"]
assert len(kwargs["text"]) < 5000
# ===========================================================================
# _handle_callback_query — approval button clicks
# ===========================================================================
@ -251,6 +271,34 @@ class TestTelegramApprovalCallback:
# State should be cleaned up
assert 1 not in adapter._approval_state
@pytest.mark.asyncio
async def test_approval_callback_escapes_dynamic_user_name(self):
adapter = _make_adapter()
adapter._approval_state[3] = "agent:main:telegram:group:12345:99"
query = AsyncMock()
query.data = "ea:once:3"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.first_name = "Alice_Bob"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
query.from_user.id = "12345"
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
with patch("tools.approval.resolve_gateway_approval", return_value=1):
await adapter._handle_callback_query(update, context)
edit_kwargs = query.edit_message_text.call_args[1]
assert "MARKDOWN_V2" in repr(edit_kwargs["parse_mode"])
assert "Alice\\_Bob" in edit_kwargs["text"]
assert "Approved once" in edit_kwargs["text"]
@pytest.mark.asyncio
async def test_deny_button(self):
adapter = _make_adapter()

View file

@ -210,6 +210,19 @@ class TestFormatMessageBoldItalic:
assert "*bold*" in result
assert "_italic_" in result
def test_reload_mcp_summary_escapes_dynamic_server_names(self, adapter):
content = (
"🔄 **MCP Servers Reloaded**\n"
"♻️ Reconnected: agent_one, tool[beta]\n"
" Added: alpha*prod\n"
"🔧 3 tool(s) available from 2 server(s)"
)
result = adapter.format_message(content)
assert "*MCP Servers Reloaded*" in result
assert "agent\\_one" in result
assert "tool\\[beta\\]" in result
assert "alpha\\*prod" in result
# =========================================================================
# format_message - headers

View file

@ -43,6 +43,65 @@ def _make_adapter():
class TestTelegramModelPicker:
@pytest.mark.asyncio
async def test_send_model_picker_escapes_dynamic_provider_label(self):
adapter = _make_adapter()
sent = {}
async def mock_send_message(**kwargs):
sent.update(kwargs)
return SimpleNamespace(message_id=101)
adapter._bot.send_message = AsyncMock(side_effect=mock_send_message)
result = await adapter.send_model_picker(
chat_id="12345",
providers=[
{"slug": "provider_one", "name": "Provider One", "total_models": 1, "is_current": True}
],
current_model="model_1",
current_provider="provider_one",
session_key="s",
on_model_selected=AsyncMock(),
metadata={"thread_id": "99999"},
)
assert result.success is True
assert "MARKDOWN_V2" in repr(sent["parse_mode"])
assert "provider\\_one" in sent["text"]
assert "`model_1`" in sent["text"]
@pytest.mark.asyncio
async def test_back_button_escapes_dynamic_provider_label(self):
adapter = _make_adapter()
adapter._model_picker_state["12345"] = {
"providers": [{"slug": "provider_one", "name": "Provider One", "total_models": 1, "is_current": True}],
"current_model": "model_1",
"current_provider": "provider_one",
"session_key": "s",
"on_model_selected": AsyncMock(),
"msg_id": 42,
}
query = AsyncMock()
query.data = "mb"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
await adapter._handle_model_picker_callback(query, "mb", "12345")
edit_kwargs = query.edit_message_text.call_args[1]
assert "MARKDOWN_V2" in repr(edit_kwargs["parse_mode"])
assert "provider\\_one" in edit_kwargs["text"]
assert "`model_1`" in edit_kwargs["text"]
@pytest.mark.asyncio
async def test_retries_without_thread_when_thread_not_found(self):
adapter = _make_adapter()