fix(gateway): route synthetic background events by session

This commit is contained in:
etcircle 2026-04-15 16:38:57 +01:00 committed by Teknium
parent da448d4fce
commit dee592a0b1
5 changed files with 240 additions and 22 deletions

View file

@ -3958,7 +3958,7 @@ class GatewayRunner:
synth_text = _format_gateway_process_notification(evt)
if synth_text:
try:
await self._inject_watch_notification(synth_text, event)
await self._inject_watch_notification(synth_text, evt)
except Exception as e2:
logger.error("Watch notification injection error: %s", e2)
except Exception as e:
@ -7452,14 +7452,75 @@ class GatewayRunner:
return prefix
return user_text
async def _inject_watch_notification(self, synth_text: str, original_event) -> None:
def _build_process_event_source(self, evt: dict):
"""Resolve the canonical source for a synthetic background-process event.
Prefer the persisted session-store origin for the event's session key.
Falling back to the currently active foreground event is what causes
cross-topic bleed, so don't do that.
"""
from gateway.session import SessionSource
session_key = str(evt.get("session_key") or "").strip()
derived_platform = ""
derived_chat_type = ""
derived_chat_id = ""
if session_key:
try:
self.session_store._ensure_loaded()
entry = self.session_store._entries.get(session_key)
if entry and getattr(entry, "origin", None):
return entry.origin
except Exception as exc:
logger.debug(
"Synthetic process-event session-store lookup failed for %s: %s",
session_key,
exc,
)
parts = session_key.split(":")
if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main":
derived_platform = parts[2]
derived_chat_type = parts[3]
derived_chat_id = parts[4]
platform_name = str(evt.get("platform") or derived_platform or "").strip().lower()
chat_type = str(evt.get("chat_type") or derived_chat_type or "").strip().lower()
chat_id = str(evt.get("chat_id") or derived_chat_id or "").strip()
if not platform_name or not chat_type or not chat_id:
return None
try:
platform = Platform(platform_name)
except Exception:
logger.warning(
"Synthetic process event has invalid platform metadata: %r",
platform_name,
)
return None
return SessionSource(
platform=platform,
chat_id=chat_id,
chat_type=chat_type,
thread_id=str(evt.get("thread_id") or "").strip() or None,
user_id=str(evt.get("user_id") or "").strip() or None,
user_name=str(evt.get("user_name") or "").strip() or None,
)
async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None:
"""Inject a watch-pattern notification as a synthetic message event.
Uses the source from the original user event to route the notification
back to the correct chat/adapter.
Routing must come from the queued watch event itself, not from whatever
foreground message happened to be active when the queue was drained.
"""
source = getattr(original_event, "source", None)
source = self._build_process_event_source(evt)
if not source:
logger.warning(
"Dropping watch notification with no routing metadata for process %s",
evt.get("session_id", "unknown"),
)
return
platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform)
adapter = None
@ -7477,7 +7538,12 @@ class GatewayRunner:
source=source,
internal=True,
)
logger.info("Watch pattern notification — injecting for %s", platform_name)
logger.info(
"Watch pattern notification — injecting for %s chat=%s thread=%s",
platform_name,
source.chat_id,
source.thread_id,
)
await adapter.handle_message(synth_event)
except Exception as e:
logger.error("Watch notification injection error: %s", e)
@ -7547,33 +7613,42 @@ class GatewayRunner:
f"Command: {session.command}\n"
f"Output:\n{_out}]"
)
source = self._build_process_event_source({
"session_id": session_id,
"session_key": session_key,
"platform": platform_name,
"chat_id": chat_id,
"thread_id": thread_id,
"user_id": user_id,
"user_name": user_name,
})
if not source:
logger.warning(
"Dropping completion notification with no routing metadata for process %s",
session_id,
)
break
adapter = None
for p, a in self.adapters.items():
if p.value == platform_name:
if p == source.platform:
adapter = a
break
if adapter and chat_id:
if adapter and source.chat_id:
try:
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource
from gateway.config import Platform
_platform_enum = Platform(platform_name)
_source = SessionSource(
platform=_platform_enum,
chat_id=chat_id,
thread_id=thread_id or None,
user_id=user_id or None,
user_name=user_name or None,
)
synth_event = MessageEvent(
text=synth_text,
message_type=MessageType.TEXT,
source=_source,
source=source,
internal=True,
)
logger.info(
"Process %s finished — injecting agent notification for session %s",
session_id, session_key,
"Process %s finished — injecting agent notification for session %s chat=%s thread=%s",
session_id,
session_key,
source.chat_id,
source.thread_id,
)
await adapter.handle_message(synth_event)
except Exception as e:

View file

@ -45,7 +45,7 @@ def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner:
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
runner = GatewayRunner(GatewayConfig())
adapter = SimpleNamespace(send=AsyncMock())
adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock())
runner.adapters[Platform.TELEGRAM] = adapter
return runner
@ -243,3 +243,62 @@ async def test_no_thread_id_sends_no_metadata(monkeypatch, tmp_path):
assert adapter.send.await_count == 1
_, kwargs = adapter.send.call_args
assert kwargs["metadata"] is None
@pytest.mark.asyncio
async def test_inject_watch_notification_routes_from_session_store_origin(monkeypatch, tmp_path):
from gateway.session import SessionSource
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace(
origin=SessionSource(
platform=Platform.TELEGRAM,
chat_id="-100",
chat_type="group",
thread_id="42",
user_id="123",
user_name="Emiliyan",
)
)
evt = {
"session_id": "proc_watch",
"session_key": "agent:main:telegram:group:-100:42",
}
await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt)
adapter.handle_message.assert_awaited_once()
synth_event = adapter.handle_message.await_args.args[0]
assert synth_event.internal is True
assert synth_event.source.platform == Platform.TELEGRAM
assert synth_event.source.chat_id == "-100"
assert synth_event.source.chat_type == "group"
assert synth_event.source.thread_id == "42"
assert synth_event.source.user_id == "123"
assert synth_event.source.user_name == "Emiliyan"
def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path):
runner = _build_runner(monkeypatch, tmp_path, "all")
evt = {
"session_id": "proc_watch",
"session_key": "agent:main:telegram:group:-100:42",
"platform": "telegram",
"chat_id": "-100",
"thread_id": "42",
"user_id": "123",
"user_name": "Emiliyan",
}
source = runner._build_process_event_source(evt)
assert source is not None
assert source.platform == Platform.TELEGRAM
assert source.chat_id == "-100"
assert source.chat_type == "group"
assert source.thread_id == "42"
assert source.user_id == "123"
assert source.user_name == "Emiliyan"

View file

@ -230,6 +230,59 @@ async def test_notify_on_complete_preserves_user_identity(monkeypatch, tmp_path)
assert event.source.user_name == "alice"
@pytest.mark.asyncio
async def test_notify_on_complete_uses_session_store_origin_for_group_topic(monkeypatch, tmp_path):
import tools.process_registry as pr_module
from gateway.session import SessionSource
sessions = [
SimpleNamespace(
output_buffer="done\n", exited=True, exit_code=0, command="echo test"
),
]
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = GatewayRunner(GatewayConfig())
adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock())
runner.adapters[Platform.TELEGRAM] = adapter
runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace(
origin=SessionSource(
platform=Platform.TELEGRAM,
chat_id="-100",
chat_type="group",
thread_id="42",
user_id="user-42",
user_name="alice",
)
)
watcher = {
"session_id": "proc_test_internal",
"check_interval": 0,
"session_key": "agent:main:telegram:group:-100:42",
"platform": "telegram",
"chat_id": "-100",
"thread_id": "42",
"notify_on_complete": True,
}
await runner._run_process_watcher(watcher)
assert adapter.handle_message.await_count == 1
event = adapter.handle_message.await_args.args[0]
assert event.internal is True
assert event.source.platform == Platform.TELEGRAM
assert event.source.chat_id == "-100"
assert event.source.chat_type == "group"
assert event.source.thread_id == "42"
assert event.source.user_id == "user-42"
assert event.source.user_name == "alice"
@pytest.mark.asyncio
async def test_none_user_id_skips_pairing(monkeypatch, tmp_path):
"""A non-internal event with user_id=None should be silently dropped."""

View file

@ -92,6 +92,25 @@ class TestCheckWatchPatterns:
assert "disk full" in evt["output"]
assert evt["session_id"] == "proc_test_watch"
def test_match_carries_session_key_and_watcher_routing_metadata(self, registry):
session = _make_session(watch_patterns=["ERROR"])
session.session_key = "agent:main:telegram:group:-100:42"
session.watcher_platform = "telegram"
session.watcher_chat_id = "-100"
session.watcher_user_id = "u123"
session.watcher_user_name = "alice"
session.watcher_thread_id = "42"
registry._check_watch_patterns(session, "ERROR: disk full\n")
evt = registry.completion_queue.get_nowait()
assert evt["session_key"] == "agent:main:telegram:group:-100:42"
assert evt["platform"] == "telegram"
assert evt["chat_id"] == "-100"
assert evt["user_id"] == "u123"
assert evt["user_name"] == "alice"
assert evt["thread_id"] == "42"
def test_multiple_patterns(self, registry):
"""First matching pattern is reported."""
session = _make_session(watch_patterns=["WARN", "ERROR"])

View file

@ -191,9 +191,15 @@ class ProcessRegistry:
session._watch_disabled = True
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_disabled",
"suppressed": session._watch_suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message": (
f"Watch patterns disabled for process {session.id}"
f"too many matches ({session._watch_suppressed} suppressed). "
@ -219,11 +225,17 @@ class ProcessRegistry:
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_match",
"pattern": matched_pattern,
"output": output,
"suppressed": suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
})
@staticmethod