fix(telegram): preserve topic metadata on overflow edits

This commit is contained in:
Maxim Esipov 2026-05-17 19:10:13 +03:00 committed by Teknium
parent c66efcff32
commit 3ec28f34ca
5 changed files with 172 additions and 30 deletions

View file

@ -1839,6 +1839,7 @@ class TelegramAdapter(BasePlatformAdapter):
content: str,
*,
finalize: bool = False,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Edit a previously sent Telegram message.
@ -1857,7 +1858,7 @@ class TelegramAdapter(BasePlatformAdapter):
# without round-tripping a doomed edit.
if utf16_len(content) > self.MAX_MESSAGE_LENGTH:
return await self._edit_overflow_split(
chat_id, message_id, content, finalize=finalize,
chat_id, message_id, content, finalize=finalize, metadata=metadata,
)
try:
@ -1902,7 +1903,7 @@ class TelegramAdapter(BasePlatformAdapter):
self.name, utf16_len(content), self.MAX_MESSAGE_LENGTH,
)
return await self._edit_overflow_split(
chat_id, message_id, content, finalize=finalize,
chat_id, message_id, content, finalize=finalize, metadata=metadata,
)
# Flood control / RetryAfter — short waits are retried inline,
# long waits return a failure immediately so streaming can fall back
@ -1973,6 +1974,7 @@ class TelegramAdapter(BasePlatformAdapter):
content: str,
*,
finalize: bool,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Split an oversized edit across the existing message + continuations.
@ -2044,8 +2046,16 @@ class TelegramAdapter(BasePlatformAdapter):
# fallback, mirroring send().
continuation_ids: list[str] = []
prev_id = message_id
thread_id = self._metadata_thread_id(metadata)
for chunk in chunks[1:]:
sent_msg = None
reply_to_id = int(prev_id) if prev_id else None
thread_kwargs = self._thread_kwargs_for_send(
chat_id,
thread_id,
metadata,
reply_to_message_id=reply_to_id,
)
for use_markdown in (True, False) if finalize else (False,):
try:
text = self.format_message(chunk) if use_markdown else chunk
@ -2053,16 +2063,31 @@ class TelegramAdapter(BasePlatformAdapter):
chat_id=int(chat_id),
text=text,
parse_mode=ParseMode.MARKDOWN_V2 if use_markdown else None,
reply_to_message_id=int(prev_id) if prev_id else None,
reply_to_message_id=reply_to_id,
**thread_kwargs,
**self._link_preview_kwargs(),
**self._notification_kwargs(metadata),
)
break
except Exception as send_err:
if "reply message not found" in str(send_err).lower():
# Drop the reply anchor and try again.
# Drop the reply anchor and try again. Private DM
# topic fallback needs the anchor and topic id together;
# forum topics can still safely keep message_thread_id.
retry_thread_kwargs = (
{}
if metadata and metadata.get("telegram_dm_topic_reply_fallback")
else self._thread_kwargs_for_send(
chat_id, thread_id, metadata, reply_to_message_id=None
)
)
try:
sent_msg = await self._bot.send_message(
chat_id=int(chat_id),
text=chunk,
**retry_thread_kwargs,
**self._link_preview_kwargs(),
**self._notification_kwargs(metadata),
)
break
except Exception as _retry_err:

View file

@ -15378,6 +15378,32 @@ class GatewayRunner:
_raw_progress_limit - (64 if _raw_progress_limit > 128 else 0),
)
# Detect whether the adapter's edit_message accepts metadata so
# overflow edits preserve Telegram topic/thread routing (#27487).
_edit_accepts_metadata = False
if _progress_metadata:
try:
_edit_params = inspect.signature(adapter.edit_message).parameters
_edit_accepts_metadata = (
"metadata" in _edit_params
or any(
param.kind is inspect.Parameter.VAR_KEYWORD
for param in _edit_params.values()
)
)
except (TypeError, ValueError):
_edit_accepts_metadata = False
async def _edit_progress_message(message_id: str, content: str):
kwargs = {
"chat_id": source.chat_id,
"message_id": message_id,
"content": content,
}
if _edit_accepts_metadata:
kwargs["metadata"] = _progress_metadata
return await adapter.edit_message(**kwargs)
def _progress_text(lines: list) -> str:
return "\n".join(str(line) for line in lines)
@ -15429,11 +15455,7 @@ class GatewayRunner:
first_text = _progress_text(groups[0])
if progress_msg_id is not None:
result = await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=first_text,
)
result = await _edit_progress_message(progress_msg_id, first_text)
if not result.success:
can_edit = False
# Fall back to the existing non-edit behavior below.
@ -15532,11 +15554,7 @@ class GatewayRunner:
if can_edit and progress_msg_id is not None:
# Try to edit the existing progress message
full_text = "\n".join(progress_lines)
result = await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=full_text,
)
result = await _edit_progress_message(progress_msg_id, full_text)
if not result.success:
_err = (getattr(result, "error", "") or "").lower()
# Transient network errors (ConnectError, timeouts)
@ -15622,11 +15640,7 @@ class GatewayRunner:
if can_edit and progress_lines and progress_msg_id:
_pending_text = _progress_text(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=_pending_text,
)
await _edit_progress_message(progress_msg_id, _pending_text)
except Exception:
pass
progress_msg_id = None
@ -15644,11 +15658,7 @@ class GatewayRunner:
if can_edit and progress_lines and progress_msg_id:
full_text = _progress_text(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=full_text,
)
await _edit_progress_message(progress_msg_id, full_text)
except Exception:
pass
return

View file

@ -16,6 +16,7 @@ Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
from __future__ import annotations
import asyncio
import inspect
import logging
import queue
import re
@ -197,6 +198,35 @@ class GatewayStreamConsumer:
the subsequent cosmetic edit (cursor removal) failed."""
return self._final_content_delivered
async def _edit_message(
self,
*,
message_id: str,
content: str,
finalize: bool = False,
):
"""Edit via the adapter, passing routing metadata when supported."""
kwargs = {
"chat_id": self.chat_id,
"message_id": message_id,
"content": content,
}
# Keep the long-standing stream-consumer contract: concrete adapters
# must accept finalize= even when it is False (guarded by tests).
kwargs["finalize"] = finalize
if self.metadata:
try:
params = inspect.signature(self.adapter.edit_message).parameters
if "metadata" in params or any(
param.kind is inspect.Parameter.VAR_KEYWORD
for param in params.values()
):
kwargs["metadata"] = self.metadata
except (TypeError, ValueError):
pass
return await self.adapter.edit_message(**kwargs)
def on_segment_break(self) -> None:
"""Finalize the current stream segment and start a fresh message."""
self._queue.put(_NEW_SEGMENT)
@ -733,8 +763,7 @@ class GatewayStreamConsumer:
):
clean_text = self._last_sent_text[:-len(self.cfg.cursor)]
try:
result = await self.adapter.edit_message(
chat_id=self.chat_id,
result = await self._edit_message(
message_id=self._message_id,
content=clean_text,
)
@ -959,8 +988,7 @@ class GatewayStreamConsumer:
if not prefix or not prefix.strip():
return
try:
await self.adapter.edit_message(
chat_id=self.chat_id,
await self._edit_message(
message_id=self._message_id,
content=prefix,
)
@ -1167,8 +1195,7 @@ class GatewayStreamConsumer:
):
return True
# Edit existing message
result = await self.adapter.edit_message(
chat_id=self.chat_id,
result = await self._edit_message(
message_id=self._message_id,
content=text,
finalize=finalize,

View file

@ -99,6 +99,21 @@ class SmallLimitProgressAdapter(ProgressCaptureAdapter):
return SendResult(success=True, message_id=message_id)
class MetadataEditProgressCaptureAdapter(ProgressCaptureAdapter):
async def edit_message(
self, chat_id, message_id, content, *, finalize: bool = False, metadata=None
) -> SendResult:
self.edits.append(
{
"chat_id": chat_id,
"message_id": message_id,
"content": content,
"metadata": metadata,
}
)
return SendResult(success=True, message_id=message_id)
class NonEditingProgressCaptureAdapter(ProgressCaptureAdapter):
SUPPORTS_MESSAGE_EDITING = False
@ -277,6 +292,44 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa
assert all(call["metadata"] == {"thread_id": "17585"} for call in adapter.typing)
@pytest.mark.asyncio
async def test_run_agent_progress_edits_keep_originating_topic_metadata(monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = FakeAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
adapter = MetadataEditProgressCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"})
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1001",
chat_type="group",
thread_id="17585",
)
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-progress-edit-topic",
session_key="agent:main:telegram:group:-1001:17585",
)
assert result["final_response"] == "done"
assert adapter.edits
assert all(call["metadata"] == {"thread_id": "17585"} for call in adapter.edits)
@pytest.mark.asyncio
async def test_run_agent_progress_does_not_use_event_message_id_for_telegram_dm(monkeypatch, tmp_path):
"""Telegram DM progress must not reuse event message id as thread metadata."""

View file

@ -809,6 +809,33 @@ class TestEditMessageStreamingSafety:
# Continuations were sent threaded as replies for visual grouping.
assert adapter._bot.send_message.await_count == len(result.continuation_message_ids)
@pytest.mark.asyncio
async def test_message_too_long_continuations_preserve_topic_metadata(self):
"""Overflow continuations should stay in the originating Telegram topic."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="fake-token"))
adapter._bot = MagicMock()
adapter._bot.edit_message_text = AsyncMock()
sent_kwargs = []
async def _fake_send(**kwargs):
sent_kwargs.append(kwargs)
return SimpleNamespace(message_id=1000 + len(sent_kwargs))
adapter._bot.send_message = AsyncMock(side_effect=_fake_send)
result = await adapter.edit_message(
"-100123",
"456",
"x" * 6000,
finalize=False,
metadata={"thread_id": "17585"},
)
assert result.success is True
assert sent_kwargs, "expected at least one overflow continuation"
assert all(kwargs.get("message_thread_id") == 17585 for kwargs in sent_kwargs)
assert sent_kwargs[0]["reply_to_message_id"] == 456
# =========================================================================
# Telegram guest mention gating
# =========================================================================