fix(gateway): route background-process notifications into Telegram DM topics

Background-process completion notifications (notify_on_complete) and
watch-pattern notifications were always delivered to the Telegram main
chat instead of the originating private-chat topic.

Hermes-created Telegram DM topic lanes only render a send when it carries
both message_thread_id and a reply anchor. The synthetic MessageEvent
injected on process completion had no message_id, so _reply_anchor_for_event
returned None and _thread_kwargs_for_send dropped message_thread_id
entirely — routing the notification to the main chat.

Capture the triggering message id at spawn time and thread it through to
the synthetic event so it can be reply-anchored back into the topic:

- session_context: add HERMES_SESSION_MESSAGE_ID context var
- telegram adapter: populate SessionSource.message_id on inbound messages
- terminal tool: persist watcher_message_id on the process session
- process registry: carry/persist message_id on watcher dicts + checkpoint
- gateway: set MessageEvent.message_id on injected notifications

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Fábio Siqueira 2026-05-16 22:31:15 -03:00 committed by Teknium
parent 17f3254ede
commit fbabd560ff
6 changed files with 130 additions and 0 deletions

View file

@ -4821,6 +4821,7 @@ class TelegramAdapter(BasePlatformAdapter):
user_name=user.full_name if user else (chat.full_name if hasattr(chat, "full_name") and chat_type == "dm" else None),
thread_id=thread_id_str,
chat_topic=chat_topic,
message_id=str(message.message_id),
)
# Extract reply context if this message is a reply.

View file

@ -13754,6 +13754,7 @@ class GatewayRunner:
user_id=str(context.source.user_id) if context.source.user_id else "",
user_name=str(context.source.user_name) if context.source.user_name else "",
session_key=context.session_key,
message_id=str(context.source.message_id) if context.source.message_id else "",
)
def _clear_session_env(self, tokens: list) -> None:
@ -14042,6 +14043,7 @@ class GatewayRunner:
message_type=MessageType.TEXT,
source=source,
internal=True,
message_id=str(evt.get("message_id") or "").strip() or None,
)
logger.info(
"Watch pattern notification — injecting for %s chat=%s thread=%s",
@ -14076,6 +14078,7 @@ class GatewayRunner:
thread_id = watcher.get("thread_id", "")
user_id = watcher.get("user_id", "")
user_name = watcher.get("user_name", "")
message_id = str(watcher.get("message_id") or "").strip() or None
agent_notify = watcher.get("notify_on_complete", False)
notify_mode = self._load_background_notifications_mode()
@ -14158,6 +14161,7 @@ class GatewayRunner:
message_type=MessageType.TEXT,
source=source,
internal=True,
message_id=message_id,
)
logger.info(
"Process %s finished — injecting agent notification for session %s chat=%s thread=%s",

View file

@ -56,6 +56,10 @@ _SESSION_USER_ID: ContextVar = ContextVar("HERMES_SESSION_USER_ID", default=_UNS
_SESSION_USER_NAME: ContextVar = ContextVar("HERMES_SESSION_USER_NAME", default=_UNSET)
_SESSION_KEY: ContextVar = ContextVar("HERMES_SESSION_KEY", default=_UNSET)
_SESSION_ID: ContextVar = ContextVar("HERMES_SESSION_ID", default=_UNSET)
# ID of the message that triggered the current turn. Used as a reply anchor
# so background-process notifications stay inside the originating Telegram
# private-chat topic (those lanes route only with thread id + reply anchor).
_SESSION_MESSAGE_ID: ContextVar = ContextVar("HERMES_SESSION_MESSAGE_ID", default=_UNSET)
# Cron auto-delivery vars — set per-job in run_job() so concurrent jobs
# don't clobber each other's delivery targets.
@ -72,6 +76,7 @@ _VAR_MAP = {
"HERMES_SESSION_USER_NAME": _SESSION_USER_NAME,
"HERMES_SESSION_KEY": _SESSION_KEY,
"HERMES_SESSION_ID": _SESSION_ID,
"HERMES_SESSION_MESSAGE_ID": _SESSION_MESSAGE_ID,
"HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM,
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID,
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID,
@ -86,6 +91,7 @@ def set_session_vars(
user_id: str = "",
user_name: str = "",
session_key: str = "",
message_id: str = "",
) -> list:
"""Set all session context variables and return reset tokens.
@ -103,6 +109,7 @@ def set_session_vars(
_SESSION_USER_ID.set(user_id),
_SESSION_USER_NAME.set(user_name),
_SESSION_KEY.set(session_key),
_SESSION_MESSAGE_ID.set(message_id),
]
return tokens
@ -126,6 +133,7 @@ def clear_session_vars(tokens: list) -> None:
_SESSION_USER_ID,
_SESSION_USER_NAME,
_SESSION_KEY,
_SESSION_MESSAGE_ID,
):
var.set("")

View file

@ -32,6 +32,9 @@ class _FakeRegistry:
return self._sessions.pop(0)
return None
def is_completion_consumed(self, session_id):
return False
def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner:
"""Create a GatewayRunner with a fake config for the given mode."""
@ -280,6 +283,111 @@ async def test_inject_watch_notification_routes_from_session_store_origin(monkey
assert synth_event.source.user_name == "Emiliyan"
@pytest.mark.asyncio
async def test_agent_notification_carries_message_id_reply_anchor(monkeypatch, tmp_path):
"""notify_on_complete injection carries the triggering message_id so the
synthetic event can be reply-anchored back into a Telegram DM topic.
Without an anchor, Telegram private-chat topic sends fall back to the main
chat (see _thread_kwargs_for_send / telegram_dm_topic_reply_fallback)."""
import tools.process_registry as pr_module
sessions = [SimpleNamespace(
output_buffer="SMOKE_OK\n", exited=True, exit_code=0, command="sleep 1",
)]
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
watcher = {
"session_id": "proc_anchor",
"check_interval": 0,
"session_key": "agent:main:telegram:dm:123:24296",
"platform": "telegram",
"chat_id": "123",
"thread_id": "24296",
"message_id": "555",
"notify_on_complete": True,
}
await runner._run_process_watcher(watcher)
adapter.handle_message.assert_awaited_once()
synth_event = adapter.handle_message.await_args.args[0]
assert synth_event.internal is True
assert synth_event.message_id == "555"
assert synth_event.source.thread_id == "24296"
@pytest.mark.asyncio
async def test_agent_notification_no_message_id_is_tolerated(monkeypatch, tmp_path):
"""A watcher dict without message_id (CLI spawn, pre-upgrade checkpoint)
still injects message_id is simply None."""
import tools.process_registry as pr_module
sessions = [SimpleNamespace(
output_buffer="done\n", exited=True, exit_code=0, command="sleep 1",
)]
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
watcher = {
"session_id": "proc_anchorless",
"check_interval": 0,
"session_key": "agent:main:telegram:dm:123:24296",
"platform": "telegram",
"chat_id": "123",
"thread_id": "24296",
"notify_on_complete": True,
}
await runner._run_process_watcher(watcher)
adapter.handle_message.assert_awaited_once()
synth_event = adapter.handle_message.await_args.args[0]
assert synth_event.message_id is None
@pytest.mark.asyncio
async def test_inject_watch_notification_carries_message_id_reply_anchor(monkeypatch, tmp_path):
from gateway.session import SessionSource
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
runner.session_store._entries["agent:main:telegram:dm:123:24296"] = SimpleNamespace(
origin=SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
thread_id="24296",
user_id="1",
user_name="Fabio",
)
)
evt = {
"session_id": "proc_watch",
"session_key": "agent:main:telegram:dm:123:24296",
"message_id": "777",
}
await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt)
adapter.handle_message.assert_awaited_once()
synth_event = adapter.handle_message.await_args.args[0]
assert synth_event.message_id == "777"
assert synth_event.source.thread_id == "24296"
def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path):
runner = _build_runner(monkeypatch, tmp_path, "all")

View file

@ -109,6 +109,7 @@ class ProcessSession:
watcher_user_id: str = ""
watcher_user_name: str = ""
watcher_thread_id: str = ""
watcher_message_id: str = "" # Triggering message id — reply anchor for topic routing
watcher_interval: int = 0 # 0 = no watcher configured
notify_on_complete: bool = False # Queue agent notification on exit
# Watch patterns — trigger agent notification when output matches any pattern
@ -278,6 +279,7 @@ class ProcessRegistry:
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
"message": (
f"Watch patterns disabled for process {session.id}"
f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered "
@ -310,6 +312,7 @@ class ProcessRegistry:
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
})
def _global_watch_admit(self, now: float) -> bool:
@ -1314,6 +1317,7 @@ class ProcessRegistry:
"watcher_user_id": s.watcher_user_id,
"watcher_user_name": s.watcher_user_name,
"watcher_thread_id": s.watcher_thread_id,
"watcher_message_id": s.watcher_message_id,
"watcher_interval": s.watcher_interval,
"notify_on_complete": s.notify_on_complete,
"watch_patterns": s.watch_patterns,
@ -1377,6 +1381,7 @@ class ProcessRegistry:
watcher_user_id=entry.get("watcher_user_id", ""),
watcher_user_name=entry.get("watcher_user_name", ""),
watcher_thread_id=entry.get("watcher_thread_id", ""),
watcher_message_id=entry.get("watcher_message_id", ""),
watcher_interval=entry.get("watcher_interval", 0),
notify_on_complete=entry.get("notify_on_complete", False),
watch_patterns=entry.get("watch_patterns", []),
@ -1397,6 +1402,7 @@ class ProcessRegistry:
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message_id": session.watcher_message_id,
"notify_on_complete": session.notify_on_complete,
})

View file

@ -1970,11 +1970,13 @@ def terminal_tool(
_gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "")
_gw_user_id = _gse("HERMES_SESSION_USER_ID", "")
_gw_user_name = _gse("HERMES_SESSION_USER_NAME", "")
_gw_message_id = _gse("HERMES_SESSION_MESSAGE_ID", "")
proc_session.watcher_platform = _gw_platform
proc_session.watcher_chat_id = _gw_chat_id
proc_session.watcher_user_id = _gw_user_id
proc_session.watcher_user_name = _gw_user_name
proc_session.watcher_thread_id = _gw_thread_id
proc_session.watcher_message_id = _gw_message_id
# Mutual exclusion: if both notify_on_complete and watch_patterns
# are set, drop watch_patterns. The combination produces duplicate
@ -2011,6 +2013,7 @@ def terminal_tool(
"user_id": proc_session.watcher_user_id,
"user_name": proc_session.watcher_user_name,
"thread_id": proc_session.watcher_thread_id,
"message_id": proc_session.watcher_message_id,
"notify_on_complete": True,
})