diff --git a/cli.py b/cli.py index 0b010a9873..9a8cce571b 100644 --- a/cli.py +++ b/cli.py @@ -4204,6 +4204,8 @@ class HermesCLI: self.agent.flush_memories(self.conversation_history) except (Exception, KeyboardInterrupt): pass + # Trigger memory extraction on the old session before session_id rotates. + self.agent.commit_memory_session(self.conversation_history) self._notify_session_boundary("on_session_finalize") elif self.agent: # First session or empty history — still finalize the old session diff --git a/cron/scheduler.py b/cron/scheduler.py index cd4576c9f1..78a20cf7f5 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -10,6 +10,7 @@ runs at a time if multiple processes overlap. import asyncio import concurrent.futures +import contextvars import json import logging import os @@ -770,7 +771,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None _POLL_INTERVAL = 5.0 _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) - _cron_future = _cron_pool.submit(agent.run_conversation, prompt) + # Preserve scheduler-scoped ContextVar state (for example skill-declared + # env passthrough registrations) when the cron run hops into the worker + # thread used for inactivity timeout monitoring. + _cron_context = contextvars.copy_context() + _cron_future = _cron_pool.submit(_cron_context.run, agent.run_conversation, prompt) _inactivity_timeout = False try: if _cron_inactivity_limit is None: diff --git a/gateway/run.py b/gateway/run.py index c3e3d56a94..0d8bbe5fb6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -482,6 +482,27 @@ def _resolve_hermes_bin() -> Optional[list[str]]: return None +def _parse_session_key(session_key: str) -> "dict | None": + """Parse a session key into its component parts. + + Session keys follow the format + ``agent:main:{platform}:{chat_type}:{chat_id}[:{thread_id}[:{user_id}]]``. + Returns a dict with ``platform``, ``chat_type``, ``chat_id``, and + optionally ``thread_id`` keys, or None if the key doesn't match. + """ + parts = session_key.split(":") + if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": + result = { + "platform": parts[2], + "chat_type": parts[3], + "chat_id": parts[4], + } + if len(parts) > 5: + result["thread_id"] = parts[5] + return result + return None + + def _format_gateway_process_notification(evt: dict) -> "str | None": """Format a watch pattern event from completion_queue into a [SYSTEM:] message.""" evt_type = evt.get("type", "completion") @@ -1489,12 +1510,11 @@ class GatewayRunner: notified: set = set() for session_key in active: # Parse platform + chat_id from the session key. - # Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...] - parts = session_key.split(":") - if len(parts) < 5: + _parsed = _parse_session_key(session_key) + if not _parsed: continue - platform_str = parts[2] - chat_id = parts[4] + platform_str = _parsed["platform"] + chat_id = _parsed["chat_id"] # Deduplicate: one notification per chat, even if multiple # sessions (different users/threads) share the same chat. @@ -1510,7 +1530,7 @@ class GatewayRunner: # Include thread_id if present so the message lands in the # correct forum topic / thread. - thread_id = parts[5] if len(parts) > 5 else None + thread_id = _parsed.get("thread_id") metadata = {"thread_id": thread_id} if thread_id else None await adapter.send(chat_id, msg, metadata=metadata) @@ -3965,7 +3985,7 @@ class GatewayRunner: synth_text = _format_gateway_process_notification(evt) if synth_text: try: - await self._inject_watch_notification(synth_text, event) + await self._inject_watch_notification(synth_text, evt) except Exception as e2: logger.error("Watch notification injection error: %s", e2) except Exception as e: @@ -7549,14 +7569,75 @@ class GatewayRunner: return prefix return user_text - async def _inject_watch_notification(self, synth_text: str, original_event) -> None: + def _build_process_event_source(self, evt: dict): + """Resolve the canonical source for a synthetic background-process event. + + Prefer the persisted session-store origin for the event's session key. + Falling back to the currently active foreground event is what causes + cross-topic bleed, so don't do that. + """ + from gateway.session import SessionSource + + session_key = str(evt.get("session_key") or "").strip() + derived_platform = "" + derived_chat_type = "" + derived_chat_id = "" + + if session_key: + try: + self.session_store._ensure_loaded() + entry = self.session_store._entries.get(session_key) + if entry and getattr(entry, "origin", None): + return entry.origin + except Exception as exc: + logger.debug( + "Synthetic process-event session-store lookup failed for %s: %s", + session_key, + exc, + ) + + _parsed = _parse_session_key(session_key) + if _parsed: + derived_platform = _parsed["platform"] + derived_chat_type = _parsed["chat_type"] + derived_chat_id = _parsed["chat_id"] + + platform_name = str(evt.get("platform") or derived_platform or "").strip().lower() + chat_type = str(evt.get("chat_type") or derived_chat_type or "").strip().lower() + chat_id = str(evt.get("chat_id") or derived_chat_id or "").strip() + if not platform_name or not chat_type or not chat_id: + return None + + try: + platform = Platform(platform_name) + except Exception: + logger.warning( + "Synthetic process event has invalid platform metadata: %r", + platform_name, + ) + return None + + return SessionSource( + platform=platform, + chat_id=chat_id, + chat_type=chat_type, + thread_id=str(evt.get("thread_id") or "").strip() or None, + user_id=str(evt.get("user_id") or "").strip() or None, + user_name=str(evt.get("user_name") or "").strip() or None, + ) + + async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None: """Inject a watch-pattern notification as a synthetic message event. - Uses the source from the original user event to route the notification - back to the correct chat/adapter. + Routing must come from the queued watch event itself, not from whatever + foreground message happened to be active when the queue was drained. """ - source = getattr(original_event, "source", None) + source = self._build_process_event_source(evt) if not source: + logger.warning( + "Dropping watch notification with no routing metadata for process %s", + evt.get("session_id", "unknown"), + ) return platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform) adapter = None @@ -7574,7 +7655,12 @@ class GatewayRunner: source=source, internal=True, ) - logger.info("Watch pattern notification — injecting for %s", platform_name) + logger.info( + "Watch pattern notification — injecting for %s chat=%s thread=%s", + platform_name, + source.chat_id, + source.thread_id, + ) await adapter.handle_message(synth_event) except Exception as e: logger.error("Watch notification injection error: %s", e) @@ -7644,33 +7730,42 @@ class GatewayRunner: f"Command: {session.command}\n" f"Output:\n{_out}]" ) + source = self._build_process_event_source({ + "session_id": session_id, + "session_key": session_key, + "platform": platform_name, + "chat_id": chat_id, + "thread_id": thread_id, + "user_id": user_id, + "user_name": user_name, + }) + if not source: + logger.warning( + "Dropping completion notification with no routing metadata for process %s", + session_id, + ) + break + adapter = None for p, a in self.adapters.items(): - if p.value == platform_name: + if p == source.platform: adapter = a break - if adapter and chat_id: + if adapter and source.chat_id: try: from gateway.platforms.base import MessageEvent, MessageType - from gateway.session import SessionSource - from gateway.config import Platform - _platform_enum = Platform(platform_name) - _source = SessionSource( - platform=_platform_enum, - chat_id=chat_id, - thread_id=thread_id or None, - user_id=user_id or None, - user_name=user_name or None, - ) synth_event = MessageEvent( text=synth_text, message_type=MessageType.TEXT, - source=_source, + source=source, internal=True, ) logger.info( - "Process %s finished — injecting agent notification for session %s", - session_id, session_key, + "Process %s finished — injecting agent notification for session %s chat=%s thread=%s", + session_id, + session_key, + source.chat_id, + source.thread_id, ) await adapter.handle_message(synth_event) except Exception as e: diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 1777d423bd..86d7ad5efb 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -10,8 +10,9 @@ lifecycle instead of read-only search endpoints. Config via environment variables (profile-scoped via each profile's .env): OPENVIKING_ENDPOINT — Server URL (default: http://127.0.0.1:1933) OPENVIKING_API_KEY — API key (required for authenticated servers) - OPENVIKING_ACCOUNT — Tenant account (default: root) + OPENVIKING_ACCOUNT — Tenant account (default: default) OPENVIKING_USER — Tenant user (default: default) + OPENVIKING_AGENT — Tenant agent (default: hermes) Capabilities: - Automatic memory extraction on session commit (6 categories) @@ -80,11 +81,12 @@ class _VikingClient: """Thin HTTP client for the OpenViking REST API.""" def __init__(self, endpoint: str, api_key: str = "", - account: str = "", user: str = ""): + account: str = "", user: str = "", agent: str = ""): self._endpoint = endpoint.rstrip("/") self._api_key = api_key - self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "root") + self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "default") self._user = user or os.environ.get("OPENVIKING_USER", "default") + self._agent = agent or os.environ.get("OPENVIKING_AGENT", "hermes") self._httpx = _get_httpx() if self._httpx is None: raise ImportError("httpx is required for OpenViking: pip install httpx") @@ -94,6 +96,7 @@ class _VikingClient: "Content-Type": "application/json", "X-OpenViking-Account": self._account, "X-OpenViking-User": self._user, + "X-OpenViking-Agent": self._agent, } if self._api_key: h["X-API-Key"] = self._api_key @@ -282,20 +285,44 @@ class OpenVikingMemoryProvider(MemoryProvider): }, { "key": "api_key", - "description": "OpenViking API key", + "description": "OpenViking API key (leave blank for local dev mode)", "secret": True, "env_var": "OPENVIKING_API_KEY", }, + { + "key": "account", + "description": "OpenViking tenant account ID ([default], used when local mode, OPENVIKING_API_KEY is empty)", + "default": "default", + "env_var": "OPENVIKING_ACCOUNT", + }, + { + "key": "user", + "description": "OpenViking user ID within the account ([default], used when local mode, OPENVIKING_API_KEY is empty)", + "default": "default", + "env_var": "OPENVIKING_USER", + }, + { + "key": "agent", + "description": "OpenViking agent ID within the account ([hermes], useful in multi-agent mode)", + "default": "hermes", + "env_var": "OPENVIKING_AGENT", + }, ] def initialize(self, session_id: str, **kwargs) -> None: self._endpoint = os.environ.get("OPENVIKING_ENDPOINT", _DEFAULT_ENDPOINT) self._api_key = os.environ.get("OPENVIKING_API_KEY", "") + self._account = os.environ.get("OPENVIKING_ACCOUNT", "default") + self._user = os.environ.get("OPENVIKING_USER", "default") + self._agent = os.environ.get("OPENVIKING_AGENT", "hermes") self._session_id = session_id self._turn_count = 0 try: - self._client = _VikingClient(self._endpoint, self._api_key) + self._client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) if not self._client.health(): logger.warning("OpenViking server at %s is not reachable", self._endpoint) self._client = None @@ -325,7 +352,8 @@ class OpenVikingMemoryProvider(MemoryProvider): "(abstract/overview/full), viking_browse to explore.\n" "Use viking_remember to store facts, viking_add_resource to index URLs/docs." ) - except Exception: + except Exception as e: + logger.warning("OpenViking system_prompt_block failed: %s", e) return ( "# OpenViking Knowledge Base\n" f"Active. Endpoint: {self._endpoint}\n" @@ -351,7 +379,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _run(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) resp = client.post("/api/v1/search/find", { "query": query, "top_k": 5, @@ -386,7 +417,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _sync(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) sid = self._session_id # Add user message @@ -442,7 +476,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _write(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) # Add as a user message with memory context so the commit # picks it up as an explicit memory during extraction client.post(f"/api/v1/sessions/{self._session_id}/messages", { diff --git a/run_agent.py b/run_agent.py index 8731cbaa0f..bea01e6709 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3040,6 +3040,18 @@ class AIAgent: except Exception: pass + def commit_memory_session(self, messages: list = None) -> None: + """Trigger end-of-session extraction without tearing providers down. + Called when session_id rotates (e.g. /new, context compression); + providers keep their state and continue running under the old + session_id — they just flush pending extraction now.""" + if not self._memory_manager: + return + try: + self._memory_manager.on_session_end(messages or []) + except Exception: + pass + def close(self) -> None: """Release all resources held by this agent instance. @@ -6827,6 +6839,8 @@ class AIAgent: try: # Propagate title to the new session with auto-numbering old_title = self._session_db.get_session_title(self.session_id) + # Trigger memory extraction on the old session before it rotates. + self.commit_memory_session(messages) self._session_db.end_session(self.session_id, "compression") old_session_id = self.session_id self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" diff --git a/scripts/release.py b/scripts/release.py index 0462556275..73d663e55a 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -197,6 +197,8 @@ AUTHOR_MAP = { "zhouboli@gmail.com": "zhouboli", "zqiao@microsoft.com": "tomqiaozc", "zzn+pa@zzn.im": "xinbenlv", + "zaynjarvis@gmail.com": "ZaynJarvis", + "zhiheng.liu@bytedance.com": "ZaynJarvis", } diff --git a/tests/agent/test_memory_provider.py b/tests/agent/test_memory_provider.py index fe04e0dd43..505f40bd55 100644 --- a/tests/agent/test_memory_provider.py +++ b/tests/agent/test_memory_provider.py @@ -695,3 +695,44 @@ class TestMemoryContextFencing: fence_end = combined.index("") assert "Alice" in combined[fence_start:fence_end] assert combined.index("weather") < fence_start + + +# --------------------------------------------------------------------------- +# AIAgent.commit_memory_session — routes to MemoryManager.on_session_end +# --------------------------------------------------------------------------- + + +class _CommitRecorder(FakeMemoryProvider): + """Provider that records on_session_end calls for assertions.""" + + def __init__(self, name="recorder"): + super().__init__(name) + self.end_calls = [] + + def on_session_end(self, messages): + self.end_calls.append(list(messages or [])) + + +class TestCommitMemorySessionRouting: + def test_on_session_end_fans_out(self): + mgr = MemoryManager() + builtin = _CommitRecorder("builtin") + external = _CommitRecorder("openviking") + mgr.add_provider(builtin) + mgr.add_provider(external) + + msgs = [{"role": "user", "content": "hi"}] + mgr.on_session_end(msgs) + + assert builtin.end_calls == [msgs] + assert external.end_calls == [msgs] + + def test_on_session_end_tolerates_failure(self): + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + bad = _CommitRecorder("bad-provider") + bad.on_session_end = lambda m: (_ for _ in ()).throw(RuntimeError("boom")) + mgr.add_provider(builtin) + mgr.add_provider(bad) + + mgr.on_session_end([]) # must not raise diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 50d3cf14f6..a1cc2e1277 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -8,6 +8,8 @@ from unittest.mock import AsyncMock, patch, MagicMock import pytest from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt +from tools.env_passthrough import clear_env_passthrough +from tools.credential_files import clear_credential_files class TestResolveOrigin: @@ -877,6 +879,117 @@ class TestRunJobPerJobOverrides: class TestRunJobSkillBacked: + def test_run_job_preserves_skill_env_passthrough_into_worker_thread(self, tmp_path): + job = { + "id": "skill-env-job", + "name": "skill env test", + "prompt": "Use the skill.", + "skill": "notion", + } + + fake_db = MagicMock() + + def _skill_view(name): + assert name == "notion" + from tools.env_passthrough import register_env_passthrough + + register_env_passthrough(["NOTION_API_KEY"]) + return json.dumps({"success": True, "content": "# notion\nUse Notion."}) + + def _run_conversation(prompt): + from tools.env_passthrough import get_all_passthrough + + assert "NOTION_API_KEY" in get_all_passthrough() + return {"final_response": "ok"} + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("tools.skills_tool.skill_view", side_effect=_skill_view), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = _run_conversation + mock_agent_cls.return_value = mock_agent + + try: + success, output, final_response, error = run_job(job) + finally: + clear_env_passthrough() + + assert success is True + assert error is None + assert final_response == "ok" + + def test_run_job_preserves_credential_file_passthrough_into_worker_thread(self, tmp_path): + """copy_context() also propagates credential_files ContextVar.""" + job = { + "id": "cred-env-job", + "name": "cred file test", + "prompt": "Use the skill.", + "skill": "google-workspace", + } + + fake_db = MagicMock() + + # Create a credential file so register_credential_file succeeds + cred_dir = tmp_path / "credentials" + cred_dir.mkdir() + (cred_dir / "google_token.json").write_text('{"token": "t"}') + + def _skill_view(name): + assert name == "google-workspace" + from tools.credential_files import register_credential_file + + register_credential_file("credentials/google_token.json") + return json.dumps({"success": True, "content": "# google-workspace\nUse Google."}) + + def _run_conversation(prompt): + from tools.credential_files import _get_registered + + registered = _get_registered() + assert registered, "credential files must be visible in worker thread" + assert any("google_token.json" in v for v in registered.values()) + return {"final_response": "ok"} + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("tools.credential_files._resolve_hermes_home", return_value=tmp_path), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("tools.skills_tool.skill_view", side_effect=_skill_view), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = _run_conversation + mock_agent_cls.return_value = mock_agent + + try: + success, output, final_response, error = run_job(job) + finally: + clear_credential_files() + + assert success is True + assert error is None + assert final_response == "ok" + def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path): job = { "id": "skill-job", diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 9c1404f89c..eabf92be63 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -14,7 +14,7 @@ from unittest.mock import AsyncMock, patch import pytest from gateway.config import GatewayConfig, Platform -from gateway.run import GatewayRunner +from gateway.run import GatewayRunner, _parse_session_key # --------------------------------------------------------------------------- @@ -45,7 +45,7 @@ def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner: monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) runner = GatewayRunner(GatewayConfig()) - adapter = SimpleNamespace(send=AsyncMock()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) runner.adapters[Platform.TELEGRAM] = adapter return runner @@ -243,3 +243,162 @@ async def test_no_thread_id_sends_no_metadata(monkeypatch, tmp_path): assert adapter.send.await_count == 1 _, kwargs = adapter.send.call_args assert kwargs["metadata"] is None + + +@pytest.mark.asyncio +async def test_inject_watch_notification_routes_from_session_store_origin(monkeypatch, tmp_path): + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="123", + user_name="Emiliyan", + ) + ) + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + } + + await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.internal is True + assert synth_event.source.platform == Platform.TELEGRAM + assert synth_event.source.chat_id == "-100" + assert synth_event.source.chat_type == "group" + assert synth_event.source.thread_id == "42" + assert synth_event.source.user_id == "123" + assert synth_event.source.user_name == "Emiliyan" + + +def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path): + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "user_id": "123", + "user_name": "Emiliyan", + } + + source = runner._build_process_event_source(evt) + + assert source is not None + assert source.platform == Platform.TELEGRAM + assert source.chat_id == "-100" + assert source.chat_type == "group" + assert source.thread_id == "42" + assert source.user_id == "123" + assert source.user_name == "Emiliyan" + + +@pytest.mark.asyncio +async def test_inject_watch_notification_ignores_foreground_event_source(monkeypatch, tmp_path): + """Negative test: watch notification must NOT route to the foreground thread.""" + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + + # Session store has the process's original thread (thread 42) + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="proc_owner", + user_name="alice", + ) + ) + + # The evt dict carries the correct session_key — NOT a foreground event + evt = { + "session_id": "proc_cross_thread", + "session_key": "agent:main:telegram:group:-100:42", + } + + await runner._inject_watch_notification("[SYSTEM: watch match]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + # Must route to thread 42 (process origin), NOT some other thread + assert synth_event.source.thread_id == "42" + assert synth_event.source.user_id == "proc_owner" + + +def test_build_process_event_source_returns_none_for_empty_evt(monkeypatch, tmp_path): + """Missing session_key and no platform metadata → None (drop notification).""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + source = runner._build_process_event_source({"session_id": "proc_orphan"}) + assert source is None + + +def test_build_process_event_source_returns_none_for_invalid_platform(monkeypatch, tmp_path): + """Invalid platform string → None.""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_bad", + "platform": "not_a_real_platform", + "chat_type": "dm", + "chat_id": "123", + } + source = runner._build_process_event_source(evt) + assert source is None + + +def test_build_process_event_source_returns_none_for_short_session_key(monkeypatch, tmp_path): + """Session key with <5 parts doesn't parse, falls through to empty metadata → None.""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_short", + "session_key": "agent:main:telegram", # Too few parts + } + source = runner._build_process_event_source(evt) + assert source is None + + +# --------------------------------------------------------------------------- +# _parse_session_key helper +# --------------------------------------------------------------------------- + +def test_parse_session_key_valid(): + result = _parse_session_key("agent:main:telegram:group:-100") + assert result == {"platform": "telegram", "chat_type": "group", "chat_id": "-100"} + + +def test_parse_session_key_with_extra_parts(): + """Thread ID (6th part) is extracted; further parts are ignored.""" + result = _parse_session_key("agent:main:discord:group:chan123:thread456") + assert result == {"platform": "discord", "chat_type": "group", "chat_id": "chan123", "thread_id": "thread456"} + + +def test_parse_session_key_with_user_id_part(): + """7th part (user_id) is ignored — only up to thread_id is extracted.""" + result = _parse_session_key("agent:main:telegram:group:chat1:thread42:user99") + assert result == {"platform": "telegram", "chat_type": "group", "chat_id": "chat1", "thread_id": "thread42"} + + +def test_parse_session_key_too_short(): + assert _parse_session_key("agent:main:telegram") is None + assert _parse_session_key("") is None + + +def test_parse_session_key_wrong_prefix(): + assert _parse_session_key("cron:main:telegram:dm:123") is None + assert _parse_session_key("agent:cron:telegram:dm:123") is None diff --git a/tests/gateway/test_internal_event_bypass_pairing.py b/tests/gateway/test_internal_event_bypass_pairing.py index 1c3f9f0c94..d10195b2d5 100644 --- a/tests/gateway/test_internal_event_bypass_pairing.py +++ b/tests/gateway/test_internal_event_bypass_pairing.py @@ -230,6 +230,59 @@ async def test_notify_on_complete_preserves_user_identity(monkeypatch, tmp_path) assert event.source.user_name == "alice" +@pytest.mark.asyncio +async def test_notify_on_complete_uses_session_store_origin_for_group_topic(monkeypatch, tmp_path): + import tools.process_registry as pr_module + from gateway.session import SessionSource + + sessions = [ + SimpleNamespace( + output_buffer="done\n", exited=True, exit_code=0, command="echo test" + ), + ] + monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions)) + + async def _instant_sleep(*_a, **_kw): + pass + monkeypatch.setattr(asyncio, "sleep", _instant_sleep) + + runner = GatewayRunner(GatewayConfig()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) + runner.adapters[Platform.TELEGRAM] = adapter + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="user-42", + user_name="alice", + ) + ) + + watcher = { + "session_id": "proc_test_internal", + "check_interval": 0, + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "notify_on_complete": True, + } + + await runner._run_process_watcher(watcher) + + assert adapter.handle_message.await_count == 1 + event = adapter.handle_message.await_args.args[0] + assert event.internal is True + assert event.source.platform == Platform.TELEGRAM + assert event.source.chat_id == "-100" + assert event.source.chat_type == "group" + assert event.source.thread_id == "42" + assert event.source.user_id == "user-42" + assert event.source.user_name == "alice" + + @pytest.mark.asyncio async def test_none_user_id_skips_pairing(monkeypatch, tmp_path): """A non-internal event with user_id=None should be silently dropped.""" diff --git a/tests/tools/test_watch_patterns.py b/tests/tools/test_watch_patterns.py index e31844f9f6..0621edc14d 100644 --- a/tests/tools/test_watch_patterns.py +++ b/tests/tools/test_watch_patterns.py @@ -92,6 +92,25 @@ class TestCheckWatchPatterns: assert "disk full" in evt["output"] assert evt["session_id"] == "proc_test_watch" + def test_match_carries_session_key_and_watcher_routing_metadata(self, registry): + session = _make_session(watch_patterns=["ERROR"]) + session.session_key = "agent:main:telegram:group:-100:42" + session.watcher_platform = "telegram" + session.watcher_chat_id = "-100" + session.watcher_user_id = "u123" + session.watcher_user_name = "alice" + session.watcher_thread_id = "42" + + registry._check_watch_patterns(session, "ERROR: disk full\n") + evt = registry.completion_queue.get_nowait() + + assert evt["session_key"] == "agent:main:telegram:group:-100:42" + assert evt["platform"] == "telegram" + assert evt["chat_id"] == "-100" + assert evt["user_id"] == "u123" + assert evt["user_name"] == "alice" + assert evt["thread_id"] == "42" + def test_multiple_patterns(self, registry): """First matching pattern is reported.""" session = _make_session(watch_patterns=["WARN", "ERROR"]) diff --git a/tools/process_registry.py b/tools/process_registry.py index 93d102fd59..0e5942529a 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -202,9 +202,15 @@ class ProcessRegistry: session._watch_disabled = True self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_disabled", "suppressed": session._watch_suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, "message": ( f"Watch patterns disabled for process {session.id} — " f"too many matches ({session._watch_suppressed} suppressed). " @@ -230,11 +236,17 @@ class ProcessRegistry: self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_match", "pattern": matched_pattern, "output": output, "suppressed": suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, }) @staticmethod diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 65f84e1464..55f4c10a81 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1384,14 +1384,10 @@ def terminal_tool( if pty_disabled_reason: result_data["pty_note"] = pty_disabled_reason - # Mark for agent notification on completion - if notify_on_complete and background: - proc_session.notify_on_complete = True - result_data["notify_on_complete"] = True - - # In gateway mode, auto-register a fast watcher so the - # gateway can detect completion and trigger a new agent - # turn. CLI mode uses the completion_queue directly. + # Populate routing metadata on the session so that + # watch-pattern and completion notifications can be + # routed back to the correct chat/thread. + if background and (notify_on_complete or watch_patterns): from gateway.session_context import get_session_env as _gse _gw_platform = _gse("HERMES_SESSION_PLATFORM", "") if _gw_platform: @@ -1404,16 +1400,26 @@ def terminal_tool( proc_session.watcher_user_id = _gw_user_id proc_session.watcher_user_name = _gw_user_name proc_session.watcher_thread_id = _gw_thread_id + + # Mark for agent notification on completion + if notify_on_complete and background: + proc_session.notify_on_complete = True + result_data["notify_on_complete"] = True + + # In gateway mode, auto-register a fast watcher so the + # gateway can detect completion and trigger a new agent + # turn. CLI mode uses the completion_queue directly. + if proc_session.watcher_platform: proc_session.watcher_interval = 5 process_registry.pending_watchers.append({ "session_id": proc_session.id, "check_interval": 5, "session_key": session_key, - "platform": _gw_platform, - "chat_id": _gw_chat_id, - "user_id": _gw_user_id, - "user_name": _gw_user_name, - "thread_id": _gw_thread_id, + "platform": proc_session.watcher_platform, + "chat_id": proc_session.watcher_chat_id, + "user_id": proc_session.watcher_user_id, + "user_name": proc_session.watcher_user_name, + "thread_id": proc_session.watcher_thread_id, "notify_on_complete": True, }) diff --git a/website/src/css/custom.css b/website/src/css/custom.css index cfc90c7f9e..eda3ec1a72 100644 --- a/website/src/css/custom.css +++ b/website/src/css/custom.css @@ -8,20 +8,24 @@ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap'); :root { - /* Gold/Amber palette from landing page */ - --ifm-color-primary: #FFD700; - --ifm-color-primary-dark: #E6C200; - --ifm-color-primary-darker: #D9B700; - --ifm-color-primary-darkest: #B39600; - --ifm-color-primary-light: #FFDD33; - --ifm-color-primary-lighter: #FFE14D; - --ifm-color-primary-lightest: #FFEB80; + /* Dark amber palette for light mode — readable on white (WCAG AA compliant) + Current gold #FFD700 has only 1.4:1 contrast on white; these tones pass 4.5:1+ */ + --ifm-color-primary: #8B6508; + --ifm-color-primary-dark: #7A5800; + --ifm-color-primary-darker: #6E4F00; + --ifm-color-primary-darkest: #5A4100; + --ifm-color-primary-light: #9E7410; + --ifm-color-primary-lighter: #B38319; + --ifm-color-primary-lightest: #C89222; --ifm-font-family-base: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; --ifm-font-family-monospace: 'JetBrains Mono', 'Fira Code', 'Cascadia Code', monospace; --ifm-code-font-size: 90%; --ifm-heading-font-weight: 600; + + --ifm-link-color: #7A5800; + --ifm-link-hover-color: #5A4100; } /* Dark mode — the PRIMARY mode, matches landing page */ @@ -91,6 +95,13 @@ padding-left: calc(var(--ifm-menu-link-padding-horizontal) - 3px); } +/* Light mode sidebar active */ +[data-theme='light'] .menu__link--active:not(.menu__link--sublist) { + background-color: rgba(139, 101, 8, 0.08); + border-left: 3px solid #8B6508; + padding-left: calc(var(--ifm-menu-link-padding-horizontal) - 3px); +} + /* Code blocks */ [data-theme='dark'] .prism-code { background-color: #0a0a12 !important; @@ -167,6 +178,16 @@ pre.prism-code.language-ascii code { border-color: rgba(255, 215, 0, 0.06); } +/* Light mode table styling */ +[data-theme='light'] table th { + background-color: rgba(139, 101, 8, 0.06); + border-color: rgba(139, 101, 8, 0.15); +} + +[data-theme='light'] table td { + border-color: rgba(139, 101, 8, 0.10); +} + /* Footer */ .footer { border-top: 1px solid rgba(255, 215, 0, 0.08); @@ -177,11 +198,16 @@ pre.prism-code.language-ascii code { transition: color 0.2s; } -.footer a:hover { +[data-theme='dark'] .footer a:hover { color: #FFD700; text-decoration: none; } +[data-theme='light'] .footer a:hover { + color: #7A5800; + text-decoration: none; +} + /* Scrollbar */ [data-theme='dark'] ::-webkit-scrollbar { width: 8px;