diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index ddee844f40..c18d3569d8 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -734,25 +734,56 @@ def merge_pending_message_event( pending_messages: Dict[str, MessageEvent], session_key: str, event: MessageEvent, + *, + merge_text: bool = False, ) -> None: """Store or merge a pending event for a session. Photo bursts/albums often arrive as multiple near-simultaneous PHOTO events. Merge those into the existing queued event so the next turn sees - the whole burst, while non-photo follow-ups still replace the pending - event normally. + the whole burst. + + When ``merge_text`` is enabled, rapid follow-up TEXT events are appended + instead of replacing the pending turn. This is used for Telegram bursty + follow-ups so a multi-part user thought is not silently truncated to only + the last queued fragment. """ existing = pending_messages.get(session_key) - if ( - existing - and getattr(existing, "message_type", None) == MessageType.PHOTO - and event.message_type == MessageType.PHOTO - ): - existing.media_urls.extend(event.media_urls) - existing.media_types.extend(event.media_types) - if event.text: - existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text) - return + if existing: + existing_is_photo = getattr(existing, "message_type", None) == MessageType.PHOTO + incoming_is_photo = event.message_type == MessageType.PHOTO + existing_has_media = bool(existing.media_urls) + incoming_has_media = bool(event.media_urls) + + if existing_is_photo and incoming_is_photo: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text) + return + + if existing_has_media or incoming_has_media: + if incoming_has_media: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + if existing.text: + existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text) + else: + existing.text = event.text + if existing_is_photo or incoming_is_photo: + existing.message_type = MessageType.PHOTO + return + + if ( + merge_text + and getattr(existing, "message_type", None) == MessageType.TEXT + and event.message_type == MessageType.TEXT + ): + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + return + pending_messages[session_key] = event diff --git a/gateway/run.py b/gateway/run.py index 4a1539927b..13f4cb6478 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2922,6 +2922,32 @@ class GatewayRunner: merge_pending_message_event(adapter._pending_messages, _quick_key, event) return None + _telegram_followup_grace = float( + os.getenv("HERMES_TELEGRAM_FOLLOWUP_GRACE_SECONDS", "3.0") + ) + _started_at = self._running_agents_ts.get(_quick_key, 0) + if ( + source.platform == Platform.TELEGRAM + and event.message_type == MessageType.TEXT + and _telegram_followup_grace > 0 + and _started_at + and (time.time() - _started_at) <= _telegram_followup_grace + ): + logger.debug( + "Telegram follow-up arrived %.2fs after run start for %s — queueing without interrupt", + time.time() - _started_at, + _quick_key[:20], + ) + adapter = self.adapters.get(source.platform) + if adapter: + merge_pending_message_event( + adapter._pending_messages, + _quick_key, + event, + merge_text=True, + ) + return None + running_agent = self._running_agents.get(_quick_key) if running_agent is _AGENT_PENDING_SENTINEL: # Agent is being set up but not ready yet. @@ -2935,7 +2961,12 @@ class GatewayRunner: # agent starts. adapter = self.adapters.get(source.platform) if adapter: - adapter._pending_messages[_quick_key] = event + merge_pending_message_event( + adapter._pending_messages, + _quick_key, + event, + merge_text=True, + ) return None if self._draining: if self._queue_during_drain_enabled(): diff --git a/tests/gateway/test_session_race_guard.py b/tests/gateway/test_session_race_guard.py index fcfaba784d..d7eeff5c1e 100644 --- a/tests/gateway/test_session_race_guard.py +++ b/tests/gateway/test_session_race_guard.py @@ -14,7 +14,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig -from gateway.platforms.base import MessageEvent, MessageType +from gateway.platforms.base import MessageEvent, MessageType, merge_pending_message_event from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL from gateway.session import SessionSource, build_session_key @@ -184,6 +184,80 @@ async def test_second_message_during_sentinel_queued_not_duplicate(): await task1 +def test_merge_pending_message_event_merges_text_and_photo_followups(): + pending = {} + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="12345", + chat_type="dm", + user_id="u1", + ) + session_key = build_session_key(source) + + text_event = MessageEvent( + text="first follow-up", + message_type=MessageType.TEXT, + source=source, + ) + photo_event = MessageEvent( + text="see screenshot", + message_type=MessageType.PHOTO, + source=source, + media_urls=["/tmp/test.png"], + media_types=["image/png"], + ) + + merge_pending_message_event(pending, session_key, text_event, merge_text=True) + merge_pending_message_event(pending, session_key, photo_event, merge_text=True) + + merged = pending[session_key] + assert merged.message_type == MessageType.PHOTO + assert merged.text == "first follow-up\n\nsee screenshot" + assert merged.media_urls == ["/tmp/test.png"] + assert merged.media_types == ["image/png"] + + +@pytest.mark.asyncio +async def test_recent_telegram_text_followup_is_queued_without_interrupt(): + runner = _make_runner() + event = _make_event(text="follow-up") + session_key = build_session_key(event.source) + + fake_agent = MagicMock() + fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0} + runner._running_agents[session_key] = fake_agent + import time as _time + runner._running_agents_ts[session_key] = _time.time() + + result = await runner._handle_message(event) + + assert result is None + fake_agent.interrupt.assert_not_called() + adapter = runner.adapters[Platform.TELEGRAM] + assert adapter._pending_messages[session_key].text == "follow-up" + + +@pytest.mark.asyncio +async def test_recent_telegram_followups_append_in_pending_queue(): + runner = _make_runner() + first = _make_event(text="part one") + second = _make_event(text="part two") + session_key = build_session_key(first.source) + + fake_agent = MagicMock() + fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0} + runner._running_agents[session_key] = fake_agent + import time as _time + runner._running_agents_ts[session_key] = _time.time() + + await runner._handle_message(first) + await runner._handle_message(second) + + fake_agent.interrupt.assert_not_called() + adapter = runner.adapters[Platform.TELEGRAM] + assert adapter._pending_messages[session_key].text == "part one\npart two" + + # ------------------------------------------------------------------ # Test 5: Sentinel not placed for command messages # ------------------------------------------------------------------ @@ -273,6 +347,7 @@ async def test_stop_hard_kills_running_agent(): # Simulate a running (possibly hung) agent fake_agent = MagicMock() + fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0} runner._running_agents[session_key] = fake_agent # Send /stop @@ -305,6 +380,7 @@ async def test_stop_clears_pending_messages(): ) fake_agent = MagicMock() + fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0} runner._running_agents[session_key] = fake_agent runner._pending_messages[session_key] = "some queued text"