diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 46aeb14b3d8..1570eb6973b 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -1839,6 +1839,7 @@ class TelegramAdapter(BasePlatformAdapter): content: str, *, finalize: bool = False, + metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Edit a previously sent Telegram message. @@ -1857,7 +1858,7 @@ class TelegramAdapter(BasePlatformAdapter): # without round-tripping a doomed edit. if utf16_len(content) > self.MAX_MESSAGE_LENGTH: return await self._edit_overflow_split( - chat_id, message_id, content, finalize=finalize, + chat_id, message_id, content, finalize=finalize, metadata=metadata, ) try: @@ -1902,7 +1903,7 @@ class TelegramAdapter(BasePlatformAdapter): self.name, utf16_len(content), self.MAX_MESSAGE_LENGTH, ) return await self._edit_overflow_split( - chat_id, message_id, content, finalize=finalize, + chat_id, message_id, content, finalize=finalize, metadata=metadata, ) # Flood control / RetryAfter — short waits are retried inline, # long waits return a failure immediately so streaming can fall back @@ -1973,6 +1974,7 @@ class TelegramAdapter(BasePlatformAdapter): content: str, *, finalize: bool, + metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Split an oversized edit across the existing message + continuations. @@ -2044,8 +2046,16 @@ class TelegramAdapter(BasePlatformAdapter): # fallback, mirroring send(). continuation_ids: list[str] = [] prev_id = message_id + thread_id = self._metadata_thread_id(metadata) for chunk in chunks[1:]: sent_msg = None + reply_to_id = int(prev_id) if prev_id else None + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ) for use_markdown in (True, False) if finalize else (False,): try: text = self.format_message(chunk) if use_markdown else chunk @@ -2053,16 +2063,31 @@ class TelegramAdapter(BasePlatformAdapter): chat_id=int(chat_id), text=text, parse_mode=ParseMode.MARKDOWN_V2 if use_markdown else None, - reply_to_message_id=int(prev_id) if prev_id else None, + reply_to_message_id=reply_to_id, + **thread_kwargs, + **self._link_preview_kwargs(), + **self._notification_kwargs(metadata), ) break except Exception as send_err: if "reply message not found" in str(send_err).lower(): - # Drop the reply anchor and try again. + # Drop the reply anchor and try again. Private DM + # topic fallback needs the anchor and topic id together; + # forum topics can still safely keep message_thread_id. + retry_thread_kwargs = ( + {} + if metadata and metadata.get("telegram_dm_topic_reply_fallback") + else self._thread_kwargs_for_send( + chat_id, thread_id, metadata, reply_to_message_id=None + ) + ) try: sent_msg = await self._bot.send_message( chat_id=int(chat_id), text=chunk, + **retry_thread_kwargs, + **self._link_preview_kwargs(), + **self._notification_kwargs(metadata), ) break except Exception as _retry_err: diff --git a/gateway/run.py b/gateway/run.py index 480e1c2cd56..91bacfbad2e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15378,6 +15378,32 @@ class GatewayRunner: _raw_progress_limit - (64 if _raw_progress_limit > 128 else 0), ) + # Detect whether the adapter's edit_message accepts metadata so + # overflow edits preserve Telegram topic/thread routing (#27487). + _edit_accepts_metadata = False + if _progress_metadata: + try: + _edit_params = inspect.signature(adapter.edit_message).parameters + _edit_accepts_metadata = ( + "metadata" in _edit_params + or any( + param.kind is inspect.Parameter.VAR_KEYWORD + for param in _edit_params.values() + ) + ) + except (TypeError, ValueError): + _edit_accepts_metadata = False + + async def _edit_progress_message(message_id: str, content: str): + kwargs = { + "chat_id": source.chat_id, + "message_id": message_id, + "content": content, + } + if _edit_accepts_metadata: + kwargs["metadata"] = _progress_metadata + return await adapter.edit_message(**kwargs) + def _progress_text(lines: list) -> str: return "\n".join(str(line) for line in lines) @@ -15429,11 +15455,7 @@ class GatewayRunner: first_text = _progress_text(groups[0]) if progress_msg_id is not None: - result = await adapter.edit_message( - chat_id=source.chat_id, - message_id=progress_msg_id, - content=first_text, - ) + result = await _edit_progress_message(progress_msg_id, first_text) if not result.success: can_edit = False # Fall back to the existing non-edit behavior below. @@ -15532,11 +15554,7 @@ class GatewayRunner: if can_edit and progress_msg_id is not None: # Try to edit the existing progress message full_text = "\n".join(progress_lines) - result = await adapter.edit_message( - chat_id=source.chat_id, - message_id=progress_msg_id, - content=full_text, - ) + result = await _edit_progress_message(progress_msg_id, full_text) if not result.success: _err = (getattr(result, "error", "") or "").lower() # Transient network errors (ConnectError, timeouts) @@ -15622,11 +15640,7 @@ class GatewayRunner: if can_edit and progress_lines and progress_msg_id: _pending_text = _progress_text(progress_lines) try: - await adapter.edit_message( - chat_id=source.chat_id, - message_id=progress_msg_id, - content=_pending_text, - ) + await _edit_progress_message(progress_msg_id, _pending_text) except Exception: pass progress_msg_id = None @@ -15644,11 +15658,7 @@ class GatewayRunner: if can_edit and progress_lines and progress_msg_id: full_text = _progress_text(progress_lines) try: - await adapter.edit_message( - chat_id=source.chat_id, - message_id=progress_msg_id, - content=full_text, - ) + await _edit_progress_message(progress_msg_id, full_text) except Exception: pass return diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index d802bc00b13..17214050919 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -16,6 +16,7 @@ Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). from __future__ import annotations import asyncio +import inspect import logging import queue import re @@ -197,6 +198,35 @@ class GatewayStreamConsumer: the subsequent cosmetic edit (cursor removal) failed.""" return self._final_content_delivered + async def _edit_message( + self, + *, + message_id: str, + content: str, + finalize: bool = False, + ): + """Edit via the adapter, passing routing metadata when supported.""" + kwargs = { + "chat_id": self.chat_id, + "message_id": message_id, + "content": content, + } + # Keep the long-standing stream-consumer contract: concrete adapters + # must accept finalize= even when it is False (guarded by tests). + kwargs["finalize"] = finalize + + if self.metadata: + try: + params = inspect.signature(self.adapter.edit_message).parameters + if "metadata" in params or any( + param.kind is inspect.Parameter.VAR_KEYWORD + for param in params.values() + ): + kwargs["metadata"] = self.metadata + except (TypeError, ValueError): + pass + return await self.adapter.edit_message(**kwargs) + def on_segment_break(self) -> None: """Finalize the current stream segment and start a fresh message.""" self._queue.put(_NEW_SEGMENT) @@ -733,8 +763,7 @@ class GatewayStreamConsumer: ): clean_text = self._last_sent_text[:-len(self.cfg.cursor)] try: - result = await self.adapter.edit_message( - chat_id=self.chat_id, + result = await self._edit_message( message_id=self._message_id, content=clean_text, ) @@ -959,8 +988,7 @@ class GatewayStreamConsumer: if not prefix or not prefix.strip(): return try: - await self.adapter.edit_message( - chat_id=self.chat_id, + await self._edit_message( message_id=self._message_id, content=prefix, ) @@ -1167,8 +1195,7 @@ class GatewayStreamConsumer: ): return True # Edit existing message - result = await self.adapter.edit_message( - chat_id=self.chat_id, + result = await self._edit_message( message_id=self._message_id, content=text, finalize=finalize, diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index b16be006eb0..8f218dfc11c 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -99,6 +99,21 @@ class SmallLimitProgressAdapter(ProgressCaptureAdapter): return SendResult(success=True, message_id=message_id) +class MetadataEditProgressCaptureAdapter(ProgressCaptureAdapter): + async def edit_message( + self, chat_id, message_id, content, *, finalize: bool = False, metadata=None + ) -> SendResult: + self.edits.append( + { + "chat_id": chat_id, + "message_id": message_id, + "content": content, + "metadata": metadata, + } + ) + return SendResult(success=True, message_id=message_id) + + class NonEditingProgressCaptureAdapter(ProgressCaptureAdapter): SUPPORTS_MESSAGE_EDITING = False @@ -277,6 +292,44 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa assert all(call["metadata"] == {"thread_id": "17585"} for call in adapter.typing) +@pytest.mark.asyncio +async def test_run_agent_progress_edits_keep_originating_topic_metadata(monkeypatch, tmp_path): + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = MetadataEditProgressCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-progress-edit-topic", + session_key="agent:main:telegram:group:-1001:17585", + ) + + assert result["final_response"] == "done" + assert adapter.edits + assert all(call["metadata"] == {"thread_id": "17585"} for call in adapter.edits) + + @pytest.mark.asyncio async def test_run_agent_progress_does_not_use_event_message_id_for_telegram_dm(monkeypatch, tmp_path): """Telegram DM progress must not reuse event message id as thread metadata.""" diff --git a/tests/gateway/test_telegram_format.py b/tests/gateway/test_telegram_format.py index 90063a01a8b..59832f13e0a 100644 --- a/tests/gateway/test_telegram_format.py +++ b/tests/gateway/test_telegram_format.py @@ -809,6 +809,33 @@ class TestEditMessageStreamingSafety: # Continuations were sent threaded as replies for visual grouping. assert adapter._bot.send_message.await_count == len(result.continuation_message_ids) + @pytest.mark.asyncio + async def test_message_too_long_continuations_preserve_topic_metadata(self): + """Overflow continuations should stay in the originating Telegram topic.""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="fake-token")) + adapter._bot = MagicMock() + adapter._bot.edit_message_text = AsyncMock() + sent_kwargs = [] + + async def _fake_send(**kwargs): + sent_kwargs.append(kwargs) + return SimpleNamespace(message_id=1000 + len(sent_kwargs)) + + adapter._bot.send_message = AsyncMock(side_effect=_fake_send) + + result = await adapter.edit_message( + "-100123", + "456", + "x" * 6000, + finalize=False, + metadata={"thread_id": "17585"}, + ) + + assert result.success is True + assert sent_kwargs, "expected at least one overflow continuation" + assert all(kwargs.get("message_thread_id") == 17585 for kwargs in sent_kwargs) + assert sent_kwargs[0]["reply_to_message_id"] == 456 + # ========================================================================= # Telegram guest mention gating # =========================================================================