diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index af66248d0e7..49d96bbb145 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -4821,6 +4821,7 @@ class TelegramAdapter(BasePlatformAdapter): user_name=user.full_name if user else (chat.full_name if hasattr(chat, "full_name") and chat_type == "dm" else None), thread_id=thread_id_str, chat_topic=chat_topic, + message_id=str(message.message_id), ) # Extract reply context if this message is a reply. diff --git a/gateway/run.py b/gateway/run.py index 2455a70c509..4f6a45d1e6f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -13754,6 +13754,7 @@ class GatewayRunner: user_id=str(context.source.user_id) if context.source.user_id else "", user_name=str(context.source.user_name) if context.source.user_name else "", session_key=context.session_key, + message_id=str(context.source.message_id) if context.source.message_id else "", ) def _clear_session_env(self, tokens: list) -> None: @@ -14042,6 +14043,7 @@ class GatewayRunner: message_type=MessageType.TEXT, source=source, internal=True, + message_id=str(evt.get("message_id") or "").strip() or None, ) logger.info( "Watch pattern notification — injecting for %s chat=%s thread=%s", @@ -14076,6 +14078,7 @@ class GatewayRunner: thread_id = watcher.get("thread_id", "") user_id = watcher.get("user_id", "") user_name = watcher.get("user_name", "") + message_id = str(watcher.get("message_id") or "").strip() or None agent_notify = watcher.get("notify_on_complete", False) notify_mode = self._load_background_notifications_mode() @@ -14158,6 +14161,7 @@ class GatewayRunner: message_type=MessageType.TEXT, source=source, internal=True, + message_id=message_id, ) logger.info( "Process %s finished — injecting agent notification for session %s chat=%s thread=%s", diff --git a/gateway/session_context.py b/gateway/session_context.py index b64f31de081..486949fae3d 100644 --- a/gateway/session_context.py +++ b/gateway/session_context.py @@ -56,6 +56,10 @@ _SESSION_USER_ID: ContextVar = ContextVar("HERMES_SESSION_USER_ID", default=_UNS _SESSION_USER_NAME: ContextVar = ContextVar("HERMES_SESSION_USER_NAME", default=_UNSET) _SESSION_KEY: ContextVar = ContextVar("HERMES_SESSION_KEY", default=_UNSET) _SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET) +# ID of the message that triggered the current turn. Used as a reply anchor +# so background-process notifications stay inside the originating Telegram +# private-chat topic (those lanes route only with thread id + reply anchor). +_SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET) # Cron auto-delivery vars — set per-job in run_job() so concurrent jobs # don't clobber each other's delivery targets. @@ -72,6 +76,7 @@ _VAR_MAP = { "HERMES_SESSION_USER_NAME": _SESSION_USER_NAME, "HERMES_SESSION_KEY": _SESSION_KEY, "HERMES_SESSION_ID": _SESSION_ID, + "HERMES_SESSION_MESSAGE_ID": _SESSION_MESSAGE_ID, "HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM, "HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID, "HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID, @@ -86,6 +91,7 @@ def set_session_vars( user_id: str = "", user_name: str = "", session_key: str = "", + message_id: str = "", ) -> list: """Set all session context variables and return reset tokens. @@ -103,6 +109,7 @@ def set_session_vars( _SESSION_USER_ID.set(user_id), _SESSION_USER_NAME.set(user_name), _SESSION_KEY.set(session_key), + _SESSION_MESSAGE_ID.set(message_id), ] return tokens @@ -126,6 +133,7 @@ def clear_session_vars(tokens: list) -> None: _SESSION_USER_ID, _SESSION_USER_NAME, _SESSION_KEY, + _SESSION_MESSAGE_ID, ): var.set("") diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 77bf7bcc18c..412b780bb6f 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -32,6 +32,9 @@ class _FakeRegistry: return self._sessions.pop(0) return None + def is_completion_consumed(self, session_id): + return False + def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner: """Create a GatewayRunner with a fake config for the given mode.""" @@ -280,6 +283,111 @@ async def test_inject_watch_notification_routes_from_session_store_origin(monkey assert synth_event.source.user_name == "Emiliyan" +@pytest.mark.asyncio +async def test_agent_notification_carries_message_id_reply_anchor(monkeypatch, tmp_path): + """notify_on_complete injection carries the triggering message_id so the + synthetic event can be reply-anchored back into a Telegram DM topic. + + Without an anchor, Telegram private-chat topic sends fall back to the main + chat (see _thread_kwargs_for_send / telegram_dm_topic_reply_fallback).""" + import tools.process_registry as pr_module + + sessions = [SimpleNamespace( + output_buffer="SMOKE_OK\n", exited=True, exit_code=0, command="sleep 1", + )] + monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions)) + + async def _instant_sleep(*_a, **_kw): + pass + monkeypatch.setattr(asyncio, "sleep", _instant_sleep) + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + + watcher = { + "session_id": "proc_anchor", + "check_interval": 0, + "session_key": "agent:main:telegram:dm:123:24296", + "platform": "telegram", + "chat_id": "123", + "thread_id": "24296", + "message_id": "555", + "notify_on_complete": True, + } + await runner._run_process_watcher(watcher) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.internal is True + assert synth_event.message_id == "555" + assert synth_event.source.thread_id == "24296" + + +@pytest.mark.asyncio +async def test_agent_notification_no_message_id_is_tolerated(monkeypatch, tmp_path): + """A watcher dict without message_id (CLI spawn, pre-upgrade checkpoint) + still injects — message_id is simply None.""" + import tools.process_registry as pr_module + + sessions = [SimpleNamespace( + output_buffer="done\n", exited=True, exit_code=0, command="sleep 1", + )] + monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions)) + + async def _instant_sleep(*_a, **_kw): + pass + monkeypatch.setattr(asyncio, "sleep", _instant_sleep) + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + + watcher = { + "session_id": "proc_anchorless", + "check_interval": 0, + "session_key": "agent:main:telegram:dm:123:24296", + "platform": "telegram", + "chat_id": "123", + "thread_id": "24296", + "notify_on_complete": True, + } + await runner._run_process_watcher(watcher) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.message_id is None + + +@pytest.mark.asyncio +async def test_inject_watch_notification_carries_message_id_reply_anchor(monkeypatch, tmp_path): + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + runner.session_store._entries["agent:main:telegram:dm:123:24296"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="123", + chat_type="dm", + thread_id="24296", + user_id="1", + user_name="Fabio", + ) + ) + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:dm:123:24296", + "message_id": "777", + } + + await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.message_id == "777" + assert synth_event.source.thread_id == "24296" + + def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path): runner = _build_runner(monkeypatch, tmp_path, "all") diff --git a/tools/process_registry.py b/tools/process_registry.py index 4ffa9923af0..aa02a05038f 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -109,6 +109,7 @@ class ProcessSession: watcher_user_id: str = "" watcher_user_name: str = "" watcher_thread_id: str = "" + watcher_message_id: str = "" # Triggering message id — reply anchor for topic routing watcher_interval: int = 0 # 0 = no watcher configured notify_on_complete: bool = False # Queue agent notification on exit # Watch patterns — trigger agent notification when output matches any pattern @@ -278,6 +279,7 @@ class ProcessRegistry: "user_id": session.watcher_user_id, "user_name": session.watcher_user_name, "thread_id": session.watcher_thread_id, + "message_id": session.watcher_message_id, "message": ( f"Watch patterns disabled for process {session.id} — " f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered " @@ -310,6 +312,7 @@ class ProcessRegistry: "user_id": session.watcher_user_id, "user_name": session.watcher_user_name, "thread_id": session.watcher_thread_id, + "message_id": session.watcher_message_id, }) def _global_watch_admit(self, now: float) -> bool: @@ -1314,6 +1317,7 @@ class ProcessRegistry: "watcher_user_id": s.watcher_user_id, "watcher_user_name": s.watcher_user_name, "watcher_thread_id": s.watcher_thread_id, + "watcher_message_id": s.watcher_message_id, "watcher_interval": s.watcher_interval, "notify_on_complete": s.notify_on_complete, "watch_patterns": s.watch_patterns, @@ -1377,6 +1381,7 @@ class ProcessRegistry: watcher_user_id=entry.get("watcher_user_id", ""), watcher_user_name=entry.get("watcher_user_name", ""), watcher_thread_id=entry.get("watcher_thread_id", ""), + watcher_message_id=entry.get("watcher_message_id", ""), watcher_interval=entry.get("watcher_interval", 0), notify_on_complete=entry.get("notify_on_complete", False), watch_patterns=entry.get("watch_patterns", []), @@ -1397,6 +1402,7 @@ class ProcessRegistry: "user_id": session.watcher_user_id, "user_name": session.watcher_user_name, "thread_id": session.watcher_thread_id, + "message_id": session.watcher_message_id, "notify_on_complete": session.notify_on_complete, }) diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 2c522fe7571..387e27881ad 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1970,11 +1970,13 @@ def terminal_tool( _gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "") _gw_user_id = _gse("HERMES_SESSION_USER_ID", "") _gw_user_name = _gse("HERMES_SESSION_USER_NAME", "") + _gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "") proc_session.watcher_platform = _gw_platform proc_session.watcher_chat_id = _gw_chat_id proc_session.watcher_user_id = _gw_user_id proc_session.watcher_user_name = _gw_user_name proc_session.watcher_thread_id = _gw_thread_id + proc_session.watcher_message_id = _gw_message_id # Mutual exclusion: if both notify_on_complete and watch_patterns # are set, drop watch_patterns. The combination produces duplicate @@ -2011,6 +2013,7 @@ def terminal_tool( "user_id": proc_session.watcher_user_id, "user_name": proc_session.watcher_user_name, "thread_id": proc_session.watcher_thread_id, + "message_id": proc_session.watcher_message_id, "notify_on_complete": True, })