diff --git a/gateway/run.py b/gateway/run.py index 2eb745f92b..670ec4c869 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3958,7 +3958,7 @@ class GatewayRunner: synth_text = _format_gateway_process_notification(evt) if synth_text: try: - await self._inject_watch_notification(synth_text, event) + await self._inject_watch_notification(synth_text, evt) except Exception as e2: logger.error("Watch notification injection error: %s", e2) except Exception as e: @@ -7452,14 +7452,75 @@ class GatewayRunner: return prefix return user_text - async def _inject_watch_notification(self, synth_text: str, original_event) -> None: + def _build_process_event_source(self, evt: dict): + """Resolve the canonical source for a synthetic background-process event. + + Prefer the persisted session-store origin for the event's session key. + Falling back to the currently active foreground event is what causes + cross-topic bleed, so don't do that. + """ + from gateway.session import SessionSource + + session_key = str(evt.get("session_key") or "").strip() + derived_platform = "" + derived_chat_type = "" + derived_chat_id = "" + + if session_key: + try: + self.session_store._ensure_loaded() + entry = self.session_store._entries.get(session_key) + if entry and getattr(entry, "origin", None): + return entry.origin + except Exception as exc: + logger.debug( + "Synthetic process-event session-store lookup failed for %s: %s", + session_key, + exc, + ) + + parts = session_key.split(":") + if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": + derived_platform = parts[2] + derived_chat_type = parts[3] + derived_chat_id = parts[4] + + platform_name = str(evt.get("platform") or derived_platform or "").strip().lower() + chat_type = str(evt.get("chat_type") or derived_chat_type or "").strip().lower() + chat_id = str(evt.get("chat_id") or derived_chat_id or "").strip() + if not platform_name or not chat_type or not chat_id: + return None + + try: + platform = Platform(platform_name) + except Exception: + logger.warning( + "Synthetic process event has invalid platform metadata: %r", + platform_name, + ) + return None + + return SessionSource( + platform=platform, + chat_id=chat_id, + chat_type=chat_type, + thread_id=str(evt.get("thread_id") or "").strip() or None, + user_id=str(evt.get("user_id") or "").strip() or None, + user_name=str(evt.get("user_name") or "").strip() or None, + ) + + async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None: """Inject a watch-pattern notification as a synthetic message event. - Uses the source from the original user event to route the notification - back to the correct chat/adapter. + Routing must come from the queued watch event itself, not from whatever + foreground message happened to be active when the queue was drained. """ - source = getattr(original_event, "source", None) + source = self._build_process_event_source(evt) if not source: + logger.warning( + "Dropping watch notification with no routing metadata for process %s", + evt.get("session_id", "unknown"), + ) return platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform) adapter = None @@ -7477,7 +7538,12 @@ class GatewayRunner: source=source, internal=True, ) - logger.info("Watch pattern notification — injecting for %s", platform_name) + logger.info( + "Watch pattern notification — injecting for %s chat=%s thread=%s", + platform_name, + source.chat_id, + source.thread_id, + ) await adapter.handle_message(synth_event) except Exception as e: logger.error("Watch notification injection error: %s", e) @@ -7547,33 +7613,42 @@ class GatewayRunner: f"Command: {session.command}\n" f"Output:\n{_out}]" ) + source = self._build_process_event_source({ + "session_id": session_id, + "session_key": session_key, + "platform": platform_name, + "chat_id": chat_id, + "thread_id": thread_id, + "user_id": user_id, + "user_name": user_name, + }) + if not source: + logger.warning( + "Dropping completion notification with no routing metadata for process %s", + session_id, + ) + break + adapter = None for p, a in self.adapters.items(): - if p.value == platform_name: + if p == source.platform: adapter = a break - if adapter and chat_id: + if adapter and source.chat_id: try: from gateway.platforms.base import MessageEvent, MessageType - from gateway.session import SessionSource - from gateway.config import Platform - _platform_enum = Platform(platform_name) - _source = SessionSource( - platform=_platform_enum, - chat_id=chat_id, - thread_id=thread_id or None, - user_id=user_id or None, - user_name=user_name or None, - ) synth_event = MessageEvent( text=synth_text, message_type=MessageType.TEXT, - source=_source, + source=source, internal=True, ) logger.info( - "Process %s finished — injecting agent notification for session %s", - session_id, session_key, + "Process %s finished — injecting agent notification for session %s chat=%s thread=%s", + session_id, + session_key, + source.chat_id, + source.thread_id, ) await adapter.handle_message(synth_event) except Exception as e: diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 9c1404f89c..90e9e063a1 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -45,7 +45,7 @@ def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner: monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) runner = GatewayRunner(GatewayConfig()) - adapter = SimpleNamespace(send=AsyncMock()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) runner.adapters[Platform.TELEGRAM] = adapter return runner @@ -243,3 +243,62 @@ async def test_no_thread_id_sends_no_metadata(monkeypatch, tmp_path): assert adapter.send.await_count == 1 _, kwargs = adapter.send.call_args assert kwargs["metadata"] is None + + +@pytest.mark.asyncio +async def test_inject_watch_notification_routes_from_session_store_origin(monkeypatch, tmp_path): + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="123", + user_name="Emiliyan", + ) + ) + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + } + + await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.internal is True + assert synth_event.source.platform == Platform.TELEGRAM + assert synth_event.source.chat_id == "-100" + assert synth_event.source.chat_type == "group" + assert synth_event.source.thread_id == "42" + assert synth_event.source.user_id == "123" + assert synth_event.source.user_name == "Emiliyan" + + +def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path): + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "user_id": "123", + "user_name": "Emiliyan", + } + + source = runner._build_process_event_source(evt) + + assert source is not None + assert source.platform == Platform.TELEGRAM + assert source.chat_id == "-100" + assert source.chat_type == "group" + assert source.thread_id == "42" + assert source.user_id == "123" + assert source.user_name == "Emiliyan" diff --git a/tests/gateway/test_internal_event_bypass_pairing.py b/tests/gateway/test_internal_event_bypass_pairing.py index 1c3f9f0c94..d10195b2d5 100644 --- a/tests/gateway/test_internal_event_bypass_pairing.py +++ b/tests/gateway/test_internal_event_bypass_pairing.py @@ -230,6 +230,59 @@ async def test_notify_on_complete_preserves_user_identity(monkeypatch, tmp_path) assert event.source.user_name == "alice" +@pytest.mark.asyncio +async def test_notify_on_complete_uses_session_store_origin_for_group_topic(monkeypatch, tmp_path): + import tools.process_registry as pr_module + from gateway.session import SessionSource + + sessions = [ + SimpleNamespace( + output_buffer="done\n", exited=True, exit_code=0, command="echo test" + ), + ] + monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions)) + + async def _instant_sleep(*_a, **_kw): + pass + monkeypatch.setattr(asyncio, "sleep", _instant_sleep) + + runner = GatewayRunner(GatewayConfig()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) + runner.adapters[Platform.TELEGRAM] = adapter + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="user-42", + user_name="alice", + ) + ) + + watcher = { + "session_id": "proc_test_internal", + "check_interval": 0, + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "notify_on_complete": True, + } + + await runner._run_process_watcher(watcher) + + assert adapter.handle_message.await_count == 1 + event = adapter.handle_message.await_args.args[0] + assert event.internal is True + assert event.source.platform == Platform.TELEGRAM + assert event.source.chat_id == "-100" + assert event.source.chat_type == "group" + assert event.source.thread_id == "42" + assert event.source.user_id == "user-42" + assert event.source.user_name == "alice" + + @pytest.mark.asyncio async def test_none_user_id_skips_pairing(monkeypatch, tmp_path): """A non-internal event with user_id=None should be silently dropped.""" diff --git a/tests/tools/test_watch_patterns.py b/tests/tools/test_watch_patterns.py index e31844f9f6..0621edc14d 100644 --- a/tests/tools/test_watch_patterns.py +++ b/tests/tools/test_watch_patterns.py @@ -92,6 +92,25 @@ class TestCheckWatchPatterns: assert "disk full" in evt["output"] assert evt["session_id"] == "proc_test_watch" + def test_match_carries_session_key_and_watcher_routing_metadata(self, registry): + session = _make_session(watch_patterns=["ERROR"]) + session.session_key = "agent:main:telegram:group:-100:42" + session.watcher_platform = "telegram" + session.watcher_chat_id = "-100" + session.watcher_user_id = "u123" + session.watcher_user_name = "alice" + session.watcher_thread_id = "42" + + registry._check_watch_patterns(session, "ERROR: disk full\n") + evt = registry.completion_queue.get_nowait() + + assert evt["session_key"] == "agent:main:telegram:group:-100:42" + assert evt["platform"] == "telegram" + assert evt["chat_id"] == "-100" + assert evt["user_id"] == "u123" + assert evt["user_name"] == "alice" + assert evt["thread_id"] == "42" + def test_multiple_patterns(self, registry): """First matching pattern is reported.""" session = _make_session(watch_patterns=["WARN", "ERROR"]) diff --git a/tools/process_registry.py b/tools/process_registry.py index a5dbc3b1bd..3a274eaa3d 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -191,9 +191,15 @@ class ProcessRegistry: session._watch_disabled = True self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_disabled", "suppressed": session._watch_suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, "message": ( f"Watch patterns disabled for process {session.id} — " f"too many matches ({session._watch_suppressed} suppressed). " @@ -219,11 +225,17 @@ class ProcessRegistry: self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_match", "pattern": matched_pattern, "output": output, "suppressed": suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, }) @staticmethod