From aa398ad6553031da2e80d08779904725fe48b992 Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Fri, 10 Apr 2026 21:32:24 -0600 Subject: [PATCH 01/16] fix(cron): preserve skill env passthrough in worker thread --- cron/scheduler.py | 7 ++++- tests/cron/test_scheduler.py | 52 ++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index cd4576c9f1..78a20cf7f5 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -10,6 +10,7 @@ runs at a time if multiple processes overlap. import asyncio import concurrent.futures +import contextvars import json import logging import os @@ -770,7 +771,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None _POLL_INTERVAL = 5.0 _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) - _cron_future = _cron_pool.submit(agent.run_conversation, prompt) + # Preserve scheduler-scoped ContextVar state (for example skill-declared + # env passthrough registrations) when the cron run hops into the worker + # thread used for inactivity timeout monitoring. + _cron_context = contextvars.copy_context() + _cron_future = _cron_pool.submit(_cron_context.run, agent.run_conversation, prompt) _inactivity_timeout = False try: if _cron_inactivity_limit is None: diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 50d3cf14f6..6ebdaf4152 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -8,6 +8,7 @@ from unittest.mock import AsyncMock, patch, MagicMock import pytest from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt +from tools.env_passthrough import clear_env_passthrough class TestResolveOrigin: @@ -877,6 +878,57 @@ class TestRunJobPerJobOverrides: class TestRunJobSkillBacked: + def test_run_job_preserves_skill_env_passthrough_into_worker_thread(self, tmp_path): + job = { + "id": "skill-env-job", + "name": "skill env test", + "prompt": "Use the skill.", + "skill": "notion", + } + + fake_db = MagicMock() + + def _skill_view(name): + assert name == "notion" + from tools.env_passthrough import register_env_passthrough + + register_env_passthrough(["NOTION_API_KEY"]) + return json.dumps({"success": True, "content": "# notion\nUse Notion."}) + + def _run_conversation(prompt): + from tools.env_passthrough import get_all_passthrough + + assert "NOTION_API_KEY" in get_all_passthrough() + return {"final_response": "ok"} + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("tools.skills_tool.skill_view", side_effect=_skill_view), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = _run_conversation + mock_agent_cls.return_value = mock_agent + + try: + success, output, final_response, error = run_job(job) + finally: + clear_env_passthrough() + + assert success is True + assert error is None + assert final_response == "ok" + def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path): job = { "id": "skill-job", From da448d4fce50b8ea1f07f93e475a3e59a2b6c867 Mon Sep 17 00:00:00 2001 From: kshitij <82637225+kshitijk4poor@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:11:08 -0700 Subject: [PATCH 02/16] test(cron): add regression test for credential_files ContextVar propagation (#10462) Follow-up to #10459 (salvage of #7527). The copy_context() fix propagates ALL ContextVars into the cron worker thread, including credential_files. This test verifies that skill-declared required_credential_files are visible inside the worker thread, matching the existing env_passthrough regression test. --- tests/cron/test_scheduler.py | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 6ebdaf4152..a1cc2e1277 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -9,6 +9,7 @@ import pytest from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt from tools.env_passthrough import clear_env_passthrough +from tools.credential_files import clear_credential_files class TestResolveOrigin: @@ -929,6 +930,66 @@ class TestRunJobSkillBacked: assert error is None assert final_response == "ok" + def test_run_job_preserves_credential_file_passthrough_into_worker_thread(self, tmp_path): + """copy_context() also propagates credential_files ContextVar.""" + job = { + "id": "cred-env-job", + "name": "cred file test", + "prompt": "Use the skill.", + "skill": "google-workspace", + } + + fake_db = MagicMock() + + # Create a credential file so register_credential_file succeeds + cred_dir = tmp_path / "credentials" + cred_dir.mkdir() + (cred_dir / "google_token.json").write_text('{"token": "t"}') + + def _skill_view(name): + assert name == "google-workspace" + from tools.credential_files import register_credential_file + + register_credential_file("credentials/google_token.json") + return json.dumps({"success": True, "content": "# google-workspace\nUse Google."}) + + def _run_conversation(prompt): + from tools.credential_files import _get_registered + + registered = _get_registered() + assert registered, "credential files must be visible in worker thread" + assert any("google_token.json" in v for v in registered.values()) + return {"final_response": "ok"} + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("tools.credential_files._resolve_hermes_home", return_value=tmp_path), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("tools.skills_tool.skill_view", side_effect=_skill_view), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = _run_conversation + mock_agent_cls.return_value = mock_agent + + try: + success, output, final_response, error = run_job(job) + finally: + clear_credential_files() + + assert success is True + assert error is None + assert final_response == "ok" + def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path): job = { "id": "skill-job", From dee592a0b1c4ce7ae65a5bea3e996dc42a817308 Mon Sep 17 00:00:00 2001 From: etcircle <33860762+etcircle@users.noreply.github.com> Date: Wed, 15 Apr 2026 16:38:57 +0100 Subject: [PATCH 03/16] fix(gateway): route synthetic background events by session --- gateway/run.py | 117 ++++++++++++++---- .../test_background_process_notifications.py | 61 ++++++++- .../test_internal_event_bypass_pairing.py | 53 ++++++++ tests/tools/test_watch_patterns.py | 19 +++ tools/process_registry.py | 12 ++ 5 files changed, 240 insertions(+), 22 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 2eb745f92b..670ec4c869 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3958,7 +3958,7 @@ class GatewayRunner: synth_text = _format_gateway_process_notification(evt) if synth_text: try: - await self._inject_watch_notification(synth_text, event) + await self._inject_watch_notification(synth_text, evt) except Exception as e2: logger.error("Watch notification injection error: %s", e2) except Exception as e: @@ -7452,14 +7452,75 @@ class GatewayRunner: return prefix return user_text - async def _inject_watch_notification(self, synth_text: str, original_event) -> None: + def _build_process_event_source(self, evt: dict): + """Resolve the canonical source for a synthetic background-process event. + + Prefer the persisted session-store origin for the event's session key. + Falling back to the currently active foreground event is what causes + cross-topic bleed, so don't do that. + """ + from gateway.session import SessionSource + + session_key = str(evt.get("session_key") or "").strip() + derived_platform = "" + derived_chat_type = "" + derived_chat_id = "" + + if session_key: + try: + self.session_store._ensure_loaded() + entry = self.session_store._entries.get(session_key) + if entry and getattr(entry, "origin", None): + return entry.origin + except Exception as exc: + logger.debug( + "Synthetic process-event session-store lookup failed for %s: %s", + session_key, + exc, + ) + + parts = session_key.split(":") + if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": + derived_platform = parts[2] + derived_chat_type = parts[3] + derived_chat_id = parts[4] + + platform_name = str(evt.get("platform") or derived_platform or "").strip().lower() + chat_type = str(evt.get("chat_type") or derived_chat_type or "").strip().lower() + chat_id = str(evt.get("chat_id") or derived_chat_id or "").strip() + if not platform_name or not chat_type or not chat_id: + return None + + try: + platform = Platform(platform_name) + except Exception: + logger.warning( + "Synthetic process event has invalid platform metadata: %r", + platform_name, + ) + return None + + return SessionSource( + platform=platform, + chat_id=chat_id, + chat_type=chat_type, + thread_id=str(evt.get("thread_id") or "").strip() or None, + user_id=str(evt.get("user_id") or "").strip() or None, + user_name=str(evt.get("user_name") or "").strip() or None, + ) + + async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None: """Inject a watch-pattern notification as a synthetic message event. - Uses the source from the original user event to route the notification - back to the correct chat/adapter. + Routing must come from the queued watch event itself, not from whatever + foreground message happened to be active when the queue was drained. """ - source = getattr(original_event, "source", None) + source = self._build_process_event_source(evt) if not source: + logger.warning( + "Dropping watch notification with no routing metadata for process %s", + evt.get("session_id", "unknown"), + ) return platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform) adapter = None @@ -7477,7 +7538,12 @@ class GatewayRunner: source=source, internal=True, ) - logger.info("Watch pattern notification — injecting for %s", platform_name) + logger.info( + "Watch pattern notification — injecting for %s chat=%s thread=%s", + platform_name, + source.chat_id, + source.thread_id, + ) await adapter.handle_message(synth_event) except Exception as e: logger.error("Watch notification injection error: %s", e) @@ -7547,33 +7613,42 @@ class GatewayRunner: f"Command: {session.command}\n" f"Output:\n{_out}]" ) + source = self._build_process_event_source({ + "session_id": session_id, + "session_key": session_key, + "platform": platform_name, + "chat_id": chat_id, + "thread_id": thread_id, + "user_id": user_id, + "user_name": user_name, + }) + if not source: + logger.warning( + "Dropping completion notification with no routing metadata for process %s", + session_id, + ) + break + adapter = None for p, a in self.adapters.items(): - if p.value == platform_name: + if p == source.platform: adapter = a break - if adapter and chat_id: + if adapter and source.chat_id: try: from gateway.platforms.base import MessageEvent, MessageType - from gateway.session import SessionSource - from gateway.config import Platform - _platform_enum = Platform(platform_name) - _source = SessionSource( - platform=_platform_enum, - chat_id=chat_id, - thread_id=thread_id or None, - user_id=user_id or None, - user_name=user_name or None, - ) synth_event = MessageEvent( text=synth_text, message_type=MessageType.TEXT, - source=_source, + source=source, internal=True, ) logger.info( - "Process %s finished — injecting agent notification for session %s", - session_id, session_key, + "Process %s finished — injecting agent notification for session %s chat=%s thread=%s", + session_id, + session_key, + source.chat_id, + source.thread_id, ) await adapter.handle_message(synth_event) except Exception as e: diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 9c1404f89c..90e9e063a1 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -45,7 +45,7 @@ def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner: monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) runner = GatewayRunner(GatewayConfig()) - adapter = SimpleNamespace(send=AsyncMock()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) runner.adapters[Platform.TELEGRAM] = adapter return runner @@ -243,3 +243,62 @@ async def test_no_thread_id_sends_no_metadata(monkeypatch, tmp_path): assert adapter.send.await_count == 1 _, kwargs = adapter.send.call_args assert kwargs["metadata"] is None + + +@pytest.mark.asyncio +async def test_inject_watch_notification_routes_from_session_store_origin(monkeypatch, tmp_path): + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="123", + user_name="Emiliyan", + ) + ) + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + } + + await runner._inject_watch_notification("[SYSTEM: Background process matched]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + assert synth_event.internal is True + assert synth_event.source.platform == Platform.TELEGRAM + assert synth_event.source.chat_id == "-100" + assert synth_event.source.chat_type == "group" + assert synth_event.source.thread_id == "42" + assert synth_event.source.user_id == "123" + assert synth_event.source.user_name == "Emiliyan" + + +def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypatch, tmp_path): + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_watch", + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "user_id": "123", + "user_name": "Emiliyan", + } + + source = runner._build_process_event_source(evt) + + assert source is not None + assert source.platform == Platform.TELEGRAM + assert source.chat_id == "-100" + assert source.chat_type == "group" + assert source.thread_id == "42" + assert source.user_id == "123" + assert source.user_name == "Emiliyan" diff --git a/tests/gateway/test_internal_event_bypass_pairing.py b/tests/gateway/test_internal_event_bypass_pairing.py index 1c3f9f0c94..d10195b2d5 100644 --- a/tests/gateway/test_internal_event_bypass_pairing.py +++ b/tests/gateway/test_internal_event_bypass_pairing.py @@ -230,6 +230,59 @@ async def test_notify_on_complete_preserves_user_identity(monkeypatch, tmp_path) assert event.source.user_name == "alice" +@pytest.mark.asyncio +async def test_notify_on_complete_uses_session_store_origin_for_group_topic(monkeypatch, tmp_path): + import tools.process_registry as pr_module + from gateway.session import SessionSource + + sessions = [ + SimpleNamespace( + output_buffer="done\n", exited=True, exit_code=0, command="echo test" + ), + ] + monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions)) + + async def _instant_sleep(*_a, **_kw): + pass + monkeypatch.setattr(asyncio, "sleep", _instant_sleep) + + runner = GatewayRunner(GatewayConfig()) + adapter = SimpleNamespace(send=AsyncMock(), handle_message=AsyncMock()) + runner.adapters[Platform.TELEGRAM] = adapter + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="user-42", + user_name="alice", + ) + ) + + watcher = { + "session_id": "proc_test_internal", + "check_interval": 0, + "session_key": "agent:main:telegram:group:-100:42", + "platform": "telegram", + "chat_id": "-100", + "thread_id": "42", + "notify_on_complete": True, + } + + await runner._run_process_watcher(watcher) + + assert adapter.handle_message.await_count == 1 + event = adapter.handle_message.await_args.args[0] + assert event.internal is True + assert event.source.platform == Platform.TELEGRAM + assert event.source.chat_id == "-100" + assert event.source.chat_type == "group" + assert event.source.thread_id == "42" + assert event.source.user_id == "user-42" + assert event.source.user_name == "alice" + + @pytest.mark.asyncio async def test_none_user_id_skips_pairing(monkeypatch, tmp_path): """A non-internal event with user_id=None should be silently dropped.""" diff --git a/tests/tools/test_watch_patterns.py b/tests/tools/test_watch_patterns.py index e31844f9f6..0621edc14d 100644 --- a/tests/tools/test_watch_patterns.py +++ b/tests/tools/test_watch_patterns.py @@ -92,6 +92,25 @@ class TestCheckWatchPatterns: assert "disk full" in evt["output"] assert evt["session_id"] == "proc_test_watch" + def test_match_carries_session_key_and_watcher_routing_metadata(self, registry): + session = _make_session(watch_patterns=["ERROR"]) + session.session_key = "agent:main:telegram:group:-100:42" + session.watcher_platform = "telegram" + session.watcher_chat_id = "-100" + session.watcher_user_id = "u123" + session.watcher_user_name = "alice" + session.watcher_thread_id = "42" + + registry._check_watch_patterns(session, "ERROR: disk full\n") + evt = registry.completion_queue.get_nowait() + + assert evt["session_key"] == "agent:main:telegram:group:-100:42" + assert evt["platform"] == "telegram" + assert evt["chat_id"] == "-100" + assert evt["user_id"] == "u123" + assert evt["user_name"] == "alice" + assert evt["thread_id"] == "42" + def test_multiple_patterns(self, registry): """First matching pattern is reported.""" session = _make_session(watch_patterns=["WARN", "ERROR"]) diff --git a/tools/process_registry.py b/tools/process_registry.py index a5dbc3b1bd..3a274eaa3d 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -191,9 +191,15 @@ class ProcessRegistry: session._watch_disabled = True self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_disabled", "suppressed": session._watch_suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, "message": ( f"Watch patterns disabled for process {session.id} — " f"too many matches ({session._watch_suppressed} suppressed). " @@ -219,11 +225,17 @@ class ProcessRegistry: self.completion_queue.put({ "session_id": session.id, + "session_key": session.session_key, "command": session.command, "type": "watch_match", "pattern": matched_pattern, "output": output, "suppressed": suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, }) @staticmethod From 2276b721410d81241bbf6053277f8d376c9b8633 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:52:30 +0530 Subject: [PATCH 04/16] fix: follow-up improvements for watch notification routing (#9537) - Populate watcher_* routing fields for watch-only processes (not just notify_on_complete), so watch-pattern events carry direct metadata instead of relying solely on session_key parsing fallback - Extract _parse_session_key() helper to dedupe session key parsing at two call sites in gateway/run.py - Add negative test proving cross-thread leakage doesn't happen - Add edge-case tests for _build_process_event_source returning None (empty evt, invalid platform, short session_key) - Add unit tests for _parse_session_key helper --- gateway/run.py | 36 +++++-- .../test_background_process_notifications.py | 96 ++++++++++++++++++- tools/terminal_tool.py | 32 ++++--- 3 files changed, 140 insertions(+), 24 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 670ec4c869..f9bf9a38b9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -482,6 +482,23 @@ def _resolve_hermes_bin() -> Optional[list[str]]: return None +def _parse_session_key(session_key: str) -> "dict | None": + """Parse a session key into its component parts. + + Session keys follow the format ``agent:main:{platform}:{chat_type}:{chat_id}``. + Returns a dict with ``platform``, ``chat_type``, and ``chat_id`` keys, + or None if the key doesn't match the expected format. + """ + parts = session_key.split(":") + if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": + return { + "platform": parts[2], + "chat_type": parts[3], + "chat_id": parts[4], + } + return None + + def _format_gateway_process_notification(evt: dict) -> "str | None": """Format a watch pattern event from completion_queue into a [SYSTEM:] message.""" evt_type = evt.get("type", "completion") @@ -1489,12 +1506,11 @@ class GatewayRunner: notified: set = set() for session_key in active: # Parse platform + chat_id from the session key. - # Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...] - parts = session_key.split(":") - if len(parts) < 5: + _parsed = _parse_session_key(session_key) + if not _parsed: continue - platform_str = parts[2] - chat_id = parts[4] + platform_str = _parsed["platform"] + chat_id = _parsed["chat_id"] # Deduplicate: one notification per chat, even if multiple # sessions (different users/threads) share the same chat. @@ -7479,11 +7495,11 @@ class GatewayRunner: exc, ) - parts = session_key.split(":") - if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": - derived_platform = parts[2] - derived_chat_type = parts[3] - derived_chat_id = parts[4] + _parsed = _parse_session_key(session_key) + if _parsed: + derived_platform = _parsed["platform"] + derived_chat_type = _parsed["chat_type"] + derived_chat_id = _parsed["chat_id"] platform_name = str(evt.get("platform") or derived_platform or "").strip().lower() chat_type = str(evt.get("chat_type") or derived_chat_type or "").strip().lower() diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 90e9e063a1..68eb5e3043 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -14,7 +14,7 @@ from unittest.mock import AsyncMock, patch import pytest from gateway.config import GatewayConfig, Platform -from gateway.run import GatewayRunner +from gateway.run import GatewayRunner, _parse_session_key # --------------------------------------------------------------------------- @@ -302,3 +302,97 @@ def test_build_process_event_source_falls_back_to_session_key_chat_type(monkeypa assert source.thread_id == "42" assert source.user_id == "123" assert source.user_name == "Emiliyan" + + +@pytest.mark.asyncio +async def test_inject_watch_notification_ignores_foreground_event_source(monkeypatch, tmp_path): + """Negative test: watch notification must NOT route to the foreground thread.""" + from gateway.session import SessionSource + + runner = _build_runner(monkeypatch, tmp_path, "all") + adapter = runner.adapters[Platform.TELEGRAM] + + # Session store has the process's original thread (thread 42) + runner.session_store._entries["agent:main:telegram:group:-100:42"] = SimpleNamespace( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-100", + chat_type="group", + thread_id="42", + user_id="proc_owner", + user_name="alice", + ) + ) + + # The evt dict carries the correct session_key — NOT a foreground event + evt = { + "session_id": "proc_cross_thread", + "session_key": "agent:main:telegram:group:-100:42", + } + + await runner._inject_watch_notification("[SYSTEM: watch match]", evt) + + adapter.handle_message.assert_awaited_once() + synth_event = adapter.handle_message.await_args.args[0] + # Must route to thread 42 (process origin), NOT some other thread + assert synth_event.source.thread_id == "42" + assert synth_event.source.user_id == "proc_owner" + + +def test_build_process_event_source_returns_none_for_empty_evt(monkeypatch, tmp_path): + """Missing session_key and no platform metadata → None (drop notification).""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + source = runner._build_process_event_source({"session_id": "proc_orphan"}) + assert source is None + + +def test_build_process_event_source_returns_none_for_invalid_platform(monkeypatch, tmp_path): + """Invalid platform string → None.""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_bad", + "platform": "not_a_real_platform", + "chat_type": "dm", + "chat_id": "123", + } + source = runner._build_process_event_source(evt) + assert source is None + + +def test_build_process_event_source_returns_none_for_short_session_key(monkeypatch, tmp_path): + """Session key with <5 parts doesn't parse, falls through to empty metadata → None.""" + runner = _build_runner(monkeypatch, tmp_path, "all") + + evt = { + "session_id": "proc_short", + "session_key": "agent:main:telegram", # Too few parts + } + source = runner._build_process_event_source(evt) + assert source is None + + +# --------------------------------------------------------------------------- +# _parse_session_key helper +# --------------------------------------------------------------------------- + +def test_parse_session_key_valid(): + result = _parse_session_key("agent:main:telegram:group:-100") + assert result == {"platform": "telegram", "chat_type": "group", "chat_id": "-100"} + + +def test_parse_session_key_with_extra_parts(): + """Extra trailing parts (thread_id etc.) are ignored — only first 5 matter.""" + result = _parse_session_key("agent:main:discord:group:chan123:thread456") + assert result == {"platform": "discord", "chat_type": "group", "chat_id": "chan123"} + + +def test_parse_session_key_too_short(): + assert _parse_session_key("agent:main:telegram") is None + assert _parse_session_key("") is None + + +def test_parse_session_key_wrong_prefix(): + assert _parse_session_key("cron:main:telegram:dm:123") is None + assert _parse_session_key("agent:cron:telegram:dm:123") is None diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 65f84e1464..55f4c10a81 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1384,14 +1384,10 @@ def terminal_tool( if pty_disabled_reason: result_data["pty_note"] = pty_disabled_reason - # Mark for agent notification on completion - if notify_on_complete and background: - proc_session.notify_on_complete = True - result_data["notify_on_complete"] = True - - # In gateway mode, auto-register a fast watcher so the - # gateway can detect completion and trigger a new agent - # turn. CLI mode uses the completion_queue directly. + # Populate routing metadata on the session so that + # watch-pattern and completion notifications can be + # routed back to the correct chat/thread. + if background and (notify_on_complete or watch_patterns): from gateway.session_context import get_session_env as _gse _gw_platform = _gse("HERMES_SESSION_PLATFORM", "") if _gw_platform: @@ -1404,16 +1400,26 @@ def terminal_tool( proc_session.watcher_user_id = _gw_user_id proc_session.watcher_user_name = _gw_user_name proc_session.watcher_thread_id = _gw_thread_id + + # Mark for agent notification on completion + if notify_on_complete and background: + proc_session.notify_on_complete = True + result_data["notify_on_complete"] = True + + # In gateway mode, auto-register a fast watcher so the + # gateway can detect completion and trigger a new agent + # turn. CLI mode uses the completion_queue directly. + if proc_session.watcher_platform: proc_session.watcher_interval = 5 process_registry.pending_watchers.append({ "session_id": proc_session.id, "check_interval": 5, "session_key": session_key, - "platform": _gw_platform, - "chat_id": _gw_chat_id, - "user_id": _gw_user_id, - "user_name": _gw_user_name, - "thread_id": _gw_thread_id, + "platform": proc_session.watcher_platform, + "chat_id": proc_session.watcher_chat_id, + "user_id": proc_session.watcher_user_id, + "user_name": proc_session.watcher_user_name, + "thread_id": proc_session.watcher_thread_id, "notify_on_complete": True, }) From f61cc464f0a150313959d66dc9c87f27f18e0b8b Mon Sep 17 00:00:00 2001 From: Teknium Date: Wed, 15 Apr 2026 11:07:24 -0700 Subject: [PATCH 05/16] fix: include thread_id in _parse_session_key and fix stale parts reference _parse_session_key() now extracts the optional 6th part (thread_id) from session keys, and _notify_active_sessions_of_shutdown uses _parsed.get() instead of the removed 'parts' variable. Without this, shutdown notifications silently failed (NameError caught by try/except) and forum topic routing was lost. --- gateway/run.py | 14 +++++++++----- .../test_background_process_notifications.py | 10 ++++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index f9bf9a38b9..327f8ae32a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -485,17 +485,21 @@ def _resolve_hermes_bin() -> Optional[list[str]]: def _parse_session_key(session_key: str) -> "dict | None": """Parse a session key into its component parts. - Session keys follow the format ``agent:main:{platform}:{chat_type}:{chat_id}``. - Returns a dict with ``platform``, ``chat_type``, and ``chat_id`` keys, - or None if the key doesn't match the expected format. + Session keys follow the format + ``agent:main:{platform}:{chat_type}:{chat_id}[:{thread_id}[:{user_id}]]``. + Returns a dict with ``platform``, ``chat_type``, ``chat_id``, and + optionally ``thread_id`` keys, or None if the key doesn't match. """ parts = session_key.split(":") if len(parts) >= 5 and parts[0] == "agent" and parts[1] == "main": - return { + result = { "platform": parts[2], "chat_type": parts[3], "chat_id": parts[4], } + if len(parts) > 5: + result["thread_id"] = parts[5] + return result return None @@ -1526,7 +1530,7 @@ class GatewayRunner: # Include thread_id if present so the message lands in the # correct forum topic / thread. - thread_id = parts[5] if len(parts) > 5 else None + thread_id = _parsed.get("thread_id") metadata = {"thread_id": thread_id} if thread_id else None await adapter.send(chat_id, msg, metadata=metadata) diff --git a/tests/gateway/test_background_process_notifications.py b/tests/gateway/test_background_process_notifications.py index 68eb5e3043..eabf92be63 100644 --- a/tests/gateway/test_background_process_notifications.py +++ b/tests/gateway/test_background_process_notifications.py @@ -383,9 +383,15 @@ def test_parse_session_key_valid(): def test_parse_session_key_with_extra_parts(): - """Extra trailing parts (thread_id etc.) are ignored — only first 5 matter.""" + """Thread ID (6th part) is extracted; further parts are ignored.""" result = _parse_session_key("agent:main:discord:group:chan123:thread456") - assert result == {"platform": "discord", "chat_type": "group", "chat_id": "chan123"} + assert result == {"platform": "discord", "chat_type": "group", "chat_id": "chan123", "thread_id": "thread456"} + + +def test_parse_session_key_with_user_id_part(): + """7th part (user_id) is ignored — only up to thread_id is extracted.""" + result = _parse_session_key("agent:main:telegram:group:chat1:thread42:user99") + assert result == {"platform": "telegram", "chat_type": "group", "chat_id": "chat1", "thread_id": "thread42"} def test_parse_session_key_too_short(): From 2dc5f9d2d387e345d1ac6e05766bcc99e3a115e0 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:17:44 -0700 Subject: [PATCH 06/16] fix: light mode link/primary colors unreadable on white background (#10457) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gold #FFD700 has 1.4:1 contrast ratio on white — barely visible. Replace with dark amber palette (#8B6508 primary, #7A5800 links) that passes WCAG AA (5.3:1 and 6.5:1 respectively). Changes: - :root primary palette → dark amber tones for light mode - Explicit light mode link colors (#7A5800 / #5A4100 hover) - Light mode sidebar active state with amber accent - Light mode table header/border styling - Footer hover color split by theme (gold for dark, amber for light) Dark mode is completely unchanged. Reported by @AbrahamMat7632 --- website/src/css/custom.css | 44 ++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/website/src/css/custom.css b/website/src/css/custom.css index cfc90c7f9e..eda3ec1a72 100644 --- a/website/src/css/custom.css +++ b/website/src/css/custom.css @@ -8,20 +8,24 @@ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap'); :root { - /* Gold/Amber palette from landing page */ - --ifm-color-primary: #FFD700; - --ifm-color-primary-dark: #E6C200; - --ifm-color-primary-darker: #D9B700; - --ifm-color-primary-darkest: #B39600; - --ifm-color-primary-light: #FFDD33; - --ifm-color-primary-lighter: #FFE14D; - --ifm-color-primary-lightest: #FFEB80; + /* Dark amber palette for light mode — readable on white (WCAG AA compliant) + Current gold #FFD700 has only 1.4:1 contrast on white; these tones pass 4.5:1+ */ + --ifm-color-primary: #8B6508; + --ifm-color-primary-dark: #7A5800; + --ifm-color-primary-darker: #6E4F00; + --ifm-color-primary-darkest: #5A4100; + --ifm-color-primary-light: #9E7410; + --ifm-color-primary-lighter: #B38319; + --ifm-color-primary-lightest: #C89222; --ifm-font-family-base: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; --ifm-font-family-monospace: 'JetBrains Mono', 'Fira Code', 'Cascadia Code', monospace; --ifm-code-font-size: 90%; --ifm-heading-font-weight: 600; + + --ifm-link-color: #7A5800; + --ifm-link-hover-color: #5A4100; } /* Dark mode — the PRIMARY mode, matches landing page */ @@ -91,6 +95,13 @@ padding-left: calc(var(--ifm-menu-link-padding-horizontal) - 3px); } +/* Light mode sidebar active */ +[data-theme='light'] .menu__link--active:not(.menu__link--sublist) { + background-color: rgba(139, 101, 8, 0.08); + border-left: 3px solid #8B6508; + padding-left: calc(var(--ifm-menu-link-padding-horizontal) - 3px); +} + /* Code blocks */ [data-theme='dark'] .prism-code { background-color: #0a0a12 !important; @@ -167,6 +178,16 @@ pre.prism-code.language-ascii code { border-color: rgba(255, 215, 0, 0.06); } +/* Light mode table styling */ +[data-theme='light'] table th { + background-color: rgba(139, 101, 8, 0.06); + border-color: rgba(139, 101, 8, 0.15); +} + +[data-theme='light'] table td { + border-color: rgba(139, 101, 8, 0.10); +} + /* Footer */ .footer { border-top: 1px solid rgba(255, 215, 0, 0.08); @@ -177,11 +198,16 @@ pre.prism-code.language-ascii code { transition: color 0.2s; } -.footer a:hover { +[data-theme='dark'] .footer a:hover { color: #FFD700; text-decoration: none; } +[data-theme='light'] .footer a:hover { + color: #7A5800; + text-decoration: none; +} + /* Scrollbar */ [data-theme='dark'] ::-webkit-scrollbar { width: 8px; From d2f85383e874db37e0a3f924ff35e03434099aaf Mon Sep 17 00:00:00 2001 From: ZaynJarvis Date: Wed, 15 Apr 2026 01:59:18 +0800 Subject: [PATCH 07/16] fix: change default OPENVIKING_ACCOUNT from root to default - Change default OPENVIKING_ACCOUNT from 'root' to 'default' - Add account and user config options to get_config_schema() - Add session creation in initialize() - Add reset_session() method - Update docstring to reflect new default This is a breaking change: existing users who relied on the 'root' account will need to either: 1. Set OPENVIKING_ACCOUNT=root in their environment, or 2. Migrate their data to the 'default' account Future release will add support for OPENVIKING_ACCOUNT and OPENVIKING_USER in setup when API key is provided. update desc for key setup --- plugins/memory/openviking/__init__.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 1777d423bd..dad1d39b7c 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -10,7 +10,7 @@ lifecycle instead of read-only search endpoints. Config via environment variables (profile-scoped via each profile's .env): OPENVIKING_ENDPOINT — Server URL (default: http://127.0.0.1:1933) OPENVIKING_API_KEY — API key (required for authenticated servers) - OPENVIKING_ACCOUNT — Tenant account (default: root) + OPENVIKING_ACCOUNT — Tenant account (default: default) OPENVIKING_USER — Tenant user (default: default) Capabilities: @@ -83,7 +83,7 @@ class _VikingClient: account: str = "", user: str = ""): self._endpoint = endpoint.rstrip("/") self._api_key = api_key - self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "root") + self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "default") self._user = user or os.environ.get("OPENVIKING_USER", "default") self._httpx = _get_httpx() if self._httpx is None: @@ -282,10 +282,22 @@ class OpenVikingMemoryProvider(MemoryProvider): }, { "key": "api_key", - "description": "OpenViking API key", + "description": "OpenViking API key (leave blank for local dev mode)", "secret": True, "env_var": "OPENVIKING_API_KEY", }, + { + "key": "account", + "description": "OpenViking tenant account ID (default, used when local mode, OPENVIKING_API_KEY is empty)", + "default": "default", + "env_var": "OPENVIKING_ACCOUNT", + }, + { + "key": "user", + "description": "OpenViking user ID within the account (default, used when local mode, OPENVIKING_API_KEY is empty)", + "default": "default", + "env_var": "OPENVIKING_USER", + }, ] def initialize(self, session_id: str, **kwargs) -> None: From 990030c26ed6c6bb1ae63784fc03adededf7b1cf Mon Sep 17 00:00:00 2001 From: ZaynJarvis Date: Wed, 15 Apr 2026 02:33:56 +0800 Subject: [PATCH 08/16] feat: add contrib map --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index 0462556275..b875eb8a54 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -197,6 +197,7 @@ AUTHOR_MAP = { "zhouboli@gmail.com": "zhouboli", "zqiao@microsoft.com": "tomqiaozc", "zzn+pa@zzn.im": "xinbenlv", + "zaynjarvis@gmail.com": "ZaynJarvis", } From 8b167af66bba0919e00ed20d55c208653e22815d Mon Sep 17 00:00:00 2001 From: Zayn Jarvis Date: Thu, 16 Apr 2026 01:35:40 +0800 Subject: [PATCH 09/16] feat: add ov agent header --- plugins/memory/openviking/__init__.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index dad1d39b7c..0ae44fb41a 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -85,6 +85,7 @@ class _VikingClient: self._api_key = api_key self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "default") self._user = user or os.environ.get("OPENVIKING_USER", "default") + self._agent = user or os.environ.get("OPENVIKING_AGENT", "hermes") self._httpx = _get_httpx() if self._httpx is None: raise ImportError("httpx is required for OpenViking: pip install httpx") @@ -94,6 +95,7 @@ class _VikingClient: "Content-Type": "application/json", "X-OpenViking-Account": self._account, "X-OpenViking-User": self._user, + "X-OpenViking-Agent": self._agent, } if self._api_key: h["X-API-Key"] = self._api_key @@ -288,16 +290,22 @@ class OpenVikingMemoryProvider(MemoryProvider): }, { "key": "account", - "description": "OpenViking tenant account ID (default, used when local mode, OPENVIKING_API_KEY is empty)", + "description": "OpenViking tenant account ID ([default], used when local mode, OPENVIKING_API_KEY is empty)", "default": "default", "env_var": "OPENVIKING_ACCOUNT", }, { "key": "user", - "description": "OpenViking user ID within the account (default, used when local mode, OPENVIKING_API_KEY is empty)", + "description": "OpenViking user ID within the account ([default], used when local mode, OPENVIKING_API_KEY is empty)", "default": "default", "env_var": "OPENVIKING_USER", }, + { + "key": "agent", + "description": "OpenViking agent ID within the account ([hermes], useful in multi-agent mode)", + "default": "hermes", + "env_var": "OPENVIKING_AGENT", + }, ] def initialize(self, session_id: str, **kwargs) -> None: From 0c30385be2460a3417511ae657c22d3bf95596d1 Mon Sep 17 00:00:00 2001 From: Zayn Jarvis Date: Thu, 16 Apr 2026 01:38:08 +0800 Subject: [PATCH 10/16] chore: update doc --- plugins/memory/openviking/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 0ae44fb41a..afc96268cb 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -12,6 +12,7 @@ Config via environment variables (profile-scoped via each profile's .env): OPENVIKING_API_KEY — API key (required for authenticated servers) OPENVIKING_ACCOUNT — Tenant account (default: default) OPENVIKING_USER — Tenant user (default: default) + OPENVIKING_AGENT — Tenant agent (default: hermes) Capabilities: - Automatic memory extraction on session commit (6 categories) From 5082a9f66ca7c2bbefc11f845e7b9fa628ac4cf5 Mon Sep 17 00:00:00 2001 From: ZaynJarvis Date: Thu, 16 Apr 2026 01:45:03 +0800 Subject: [PATCH 11/16] fix: wire agent/account/user params through _VikingClient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix copy-paste bug: `self._agent = user` → `self._agent = agent` with new `agent` parameter in `_VikingClient.__init__` - Read account/user/agent env vars in `initialize()` and pass them to all 4 `_VikingClient` instantiations so identity headers are consistently applied across health check, prefetch, sync, and memory write paths Co-Authored-By: Claude Opus 4.6 --- plugins/memory/openviking/__init__.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index afc96268cb..ec2d27d999 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -81,12 +81,12 @@ class _VikingClient: """Thin HTTP client for the OpenViking REST API.""" def __init__(self, endpoint: str, api_key: str = "", - account: str = "", user: str = ""): + account: str = "", user: str = "", agent: str = ""): self._endpoint = endpoint.rstrip("/") self._api_key = api_key self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "default") self._user = user or os.environ.get("OPENVIKING_USER", "default") - self._agent = user or os.environ.get("OPENVIKING_AGENT", "hermes") + self._agent = agent or os.environ.get("OPENVIKING_AGENT", "hermes") self._httpx = _get_httpx() if self._httpx is None: raise ImportError("httpx is required for OpenViking: pip install httpx") @@ -312,11 +312,17 @@ class OpenVikingMemoryProvider(MemoryProvider): def initialize(self, session_id: str, **kwargs) -> None: self._endpoint = os.environ.get("OPENVIKING_ENDPOINT", _DEFAULT_ENDPOINT) self._api_key = os.environ.get("OPENVIKING_API_KEY", "") + self._account = os.environ.get("OPENVIKING_ACCOUNT", "default") + self._user = os.environ.get("OPENVIKING_USER", "default") + self._agent = os.environ.get("OPENVIKING_AGENT", "hermes") self._session_id = session_id self._turn_count = 0 try: - self._client = _VikingClient(self._endpoint, self._api_key) + self._client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) if not self._client.health(): logger.warning("OpenViking server at %s is not reachable", self._endpoint) self._client = None @@ -372,7 +378,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _run(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) resp = client.post("/api/v1/search/find", { "query": query, "top_k": 5, @@ -407,7 +416,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _sync(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) sid = self._session_id # Add user message @@ -463,7 +475,10 @@ class OpenVikingMemoryProvider(MemoryProvider): def _write(): try: - client = _VikingClient(self._endpoint, self._api_key) + client = _VikingClient( + self._endpoint, self._api_key, + account=self._account, user=self._user, agent=self._agent, + ) # Add as a user message with memory context so the commit # picks it up as an explicit memory during extraction client.post(f"/api/v1/sessions/{self._session_id}/messages", { From f3ec4b3a16088d92beb3cccb1532a5007fe95959 Mon Sep 17 00:00:00 2001 From: "zhiheng.liu" Date: Tue, 14 Apr 2026 01:49:00 +0800 Subject: [PATCH 12/16] Fix OpenViking integration issues: explicit session creation, better error logging --- plugins/memory/openviking/__init__.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index ec2d27d999..72ec3b1051 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -109,7 +109,12 @@ class _VikingClient: resp = self._httpx.get( self._url(path), headers=self._headers(), timeout=_TIMEOUT, **kwargs ) - resp.raise_for_status() + try: + resp.raise_for_status() + except Exception as e: + logger.debug("OpenViking request failed: %s %s, status: %s, response: %s", + "GET", path, resp.status_code, resp.text) + raise return resp.json() def post(self, path: str, payload: dict = None, **kwargs) -> dict: @@ -117,7 +122,12 @@ class _VikingClient: self._url(path), json=payload or {}, headers=self._headers(), timeout=_TIMEOUT, **kwargs ) - resp.raise_for_status() + try: + resp.raise_for_status() + except Exception as e: + logger.debug("OpenViking request failed: %s %s, status: %s, response: %s", + "POST", path, resp.status_code, resp.text) + raise return resp.json() def health(self) -> bool: @@ -326,6 +336,13 @@ class OpenVikingMemoryProvider(MemoryProvider): if not self._client.health(): logger.warning("OpenViking server at %s is not reachable", self._endpoint) self._client = None + else: + # Explicitly create the session to ensure it exists + try: + self._client.post("/api/v1/sessions", {"session_id": self._session_id}) + logger.info("OpenViking session %s created", self._session_id) + except Exception as e: + logger.debug("OpenViking session creation failed (may already exist): %s", e) except ImportError: logger.warning("httpx not installed — OpenViking plugin disabled") self._client = None @@ -352,7 +369,8 @@ class OpenVikingMemoryProvider(MemoryProvider): "(abstract/overview/full), viking_browse to explore.\n" "Use viking_remember to store facts, viking_add_resource to index URLs/docs." ) - except Exception: + except Exception as e: + logger.warning("OpenViking system_prompt_block failed: %s", e) return ( "# OpenViking Knowledge Base\n" f"Active. Endpoint: {self._endpoint}\n" From 7856d304f20303617453016ed9e818c729f6ee97 Mon Sep 17 00:00:00 2001 From: "zhiheng.liu" Date: Wed, 15 Apr 2026 23:14:32 +0800 Subject: [PATCH 13/16] fix(openviking): commit session on /new and context compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OpenViking memory provider extracts memories when its session is committed (POST /api/v1/sessions/{id}/commit). Before this fix, the CLI had two code paths that changed the active session_id without ever committing the outgoing OpenViking session: 1. /new (new_session() in cli.py) — called flush_memories() to write MEMORY.md, then immediately discarded the old session_id. The accumulated OpenViking session was never committed, so all context from that session was lost before extraction could run. 2. /compress and auto-compress (_compress_context() in run_agent.py) — split the SQLite session (new session_id) but left the OpenViking provider pointing at the old session_id with no commit, meaning all messages synced to OpenViking were silently orphaned. The gateway already handles session commit on /new and /reset via shutdown_memory_provider() on the cached agent; the CLI path did not. Fix: introduce a lightweight session-transition lifecycle alongside the existing full shutdown path: - OpenVikingMemoryProvider.reset_session(new_session_id): waits for in-flight background threads, resets per-session counters, and creates the new OV session via POST /api/v1/sessions — without tearing down the HTTP client (avoids connection overhead on /new). - MemoryManager.restart_session(new_session_id): calls reset_session() on providers that implement it; falls back to initialize() for providers that do not. Skips the builtin provider (no per-session state). - AIAgent.commit_memory_session(messages): wraps memory_manager.on_session_end() without shutdown — commits OV session for extraction but leaves the provider alive for the next session. - AIAgent.reinitialize_memory_session(new_session_id): wraps memory_manager.restart_session() — transitions all external providers to the new session after session_id has been assigned. Call sites: - cli.py new_session(): commit BEFORE session_id changes, reinitialize AFTER — ensuring OV extraction runs on the correct session and the new session is immediately ready for the next turn. - run_agent._compress_context(): same pattern, inside the if self._session_db: block where the session_id split happens. /compress and auto-compress are functionally identical at this layer: both call _compress_context(), so both are fixed by the same change. Tests added to tests/agent/test_memory_provider.py: - TestMemoryManagerRestartSession: reset_session() routing, builtin skip, initialize() fallback, failure tolerance, empty-manager noop. - TestOpenVikingResetSession: session_id update, per-session state clear, POST /api/v1/sessions call, API failure tolerance, no-client noop. Co-Authored-By: Claude Sonnet 4.6 --- agent/memory_manager.py | 22 ++++ cli.py | 14 +++ plugins/memory/openviking/__init__.py | 28 +++++ run_agent.py | 33 ++++++ tests/agent/test_memory_provider.py | 153 ++++++++++++++++++++++++++ 5 files changed, 250 insertions(+) diff --git a/agent/memory_manager.py b/agent/memory_manager.py index 6cd1c860b6..8320710ce7 100644 --- a/agent/memory_manager.py +++ b/agent/memory_manager.py @@ -281,6 +281,28 @@ class MemoryManager: provider.name, e, ) + def restart_session(self, new_session_id: str) -> None: + """Transition external providers to a new session without full teardown. + + Must be called AFTER on_session_end() has committed the old session. + Providers that implement reset_session() are transitioned cheaply + (HTTP client kept alive); others fall back to a full initialize(). + The builtin provider is skipped — it has no per-session state. + """ + for provider in self._providers: + if provider.name == "builtin": + continue + try: + if hasattr(provider, "reset_session"): + provider.reset_session(new_session_id) + else: + provider.initialize(session_id=new_session_id) + except Exception as e: + logger.debug( + "Memory provider '%s' restart_session failed: %s", + provider.name, e, + ) + def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Notify all providers before context compression. diff --git a/cli.py b/cli.py index 97698f133d..94e56b0d5d 100644 --- a/cli.py +++ b/cli.py @@ -4100,6 +4100,13 @@ class HermesCLI: self.agent.flush_memories(self.conversation_history) except (Exception, KeyboardInterrupt): pass + # Commit external memory providers (e.g. OpenViking) BEFORE + # session_id changes so extraction runs on the correct session. + if hasattr(self.agent, "commit_memory_session"): + try: + self.agent.commit_memory_session(self.conversation_history) + except Exception: + pass self._notify_session_boundary("on_session_finalize") elif self.agent: # First session or empty history — still finalize the old session @@ -4148,6 +4155,13 @@ class HermesCLI: ) except Exception: pass + # Reinitialize external memory providers with the new session_id + # so subsequent turns are tracked under the new session. + if hasattr(self.agent, "reinitialize_memory_session"): + try: + self.agent.reinitialize_memory_session(self.session_id) + except Exception: + pass self._notify_session_boundary("on_session_reset") if not silent: diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 72ec3b1051..b1cb03b733 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -533,6 +533,34 @@ class OpenVikingMemoryProvider(MemoryProvider): except Exception as e: return tool_error(str(e)) + def reset_session(self, new_session_id: str) -> None: + """Transition to a new session without tearing down the HTTP client. + + Called by MemoryManager.restart_session() after on_session_end() has + committed the old session (e.g. after CLI /new or context compression). + Lighter than shutdown() + initialize(): keeps the httpx client alive, + resets per-session counters, and creates the new OV session. + """ + for t in (self._sync_thread, self._prefetch_thread): + if t and t.is_alive(): + t.join(timeout=5.0) + + self._session_id = new_session_id + self._turn_count = 0 + self._prefetch_result = "" + self._sync_thread = None + self._prefetch_thread = None + + if self._client: + try: + self._client.post("/api/v1/sessions", {"session_id": self._session_id}) + logger.info("OpenViking new session %s created", self._session_id) + except Exception as e: + logger.debug("OpenViking session creation on reset: %s", e) + + global _last_active_provider + _last_active_provider = self + def shutdown(self) -> None: # Wait for background threads to finish for t in (self._sync_thread, self._prefetch_thread): diff --git a/run_agent.py b/run_agent.py index efaeba8294..773d22bed0 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3040,6 +3040,34 @@ class AIAgent: except Exception: pass + def commit_memory_session(self, messages: list = None) -> None: + """Commit external memory providers for the current session. + + Calls on_session_end() WITHOUT shutting down providers — the session + data (e.g. OpenViking) is committed for extraction, but the HTTP + client and provider state remain alive for the next session. + Called before session_id changes (e.g. /new, context compression). + """ + if self._memory_manager: + try: + self._memory_manager.on_session_end(messages or []) + except Exception: + pass + + def reinitialize_memory_session(self, new_session_id: str) -> None: + """Transition memory providers to a new session after commit. + + Calls restart_session() which uses reset_session() on providers that + support it (cheap: keeps HTTP client, resets per-session counters) or + falls back to initialize() for providers that don't. + Called after session_id has been assigned (e.g. /new, compression). + """ + if self._memory_manager: + try: + self._memory_manager.restart_session(new_session_id) + except Exception: + pass + def close(self) -> None: """Release all resources held by this agent instance. @@ -6826,9 +6854,14 @@ class AIAgent: try: # Propagate title to the new session with auto-numbering old_title = self._session_db.get_session_title(self.session_id) + # Commit external memory (e.g. OpenViking) before session_id + # changes so extraction runs on the correct session. + self.commit_memory_session([]) self._session_db.end_session(self.session_id, "compression") old_session_id = self.session_id self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" + # Transition external memory providers to the new session_id. + self.reinitialize_memory_session(self.session_id) # Update session_log_file to point to the new session's JSON file self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" self._session_db.create_session( diff --git a/tests/agent/test_memory_provider.py b/tests/agent/test_memory_provider.py index fe04e0dd43..afd3dc002f 100644 --- a/tests/agent/test_memory_provider.py +++ b/tests/agent/test_memory_provider.py @@ -695,3 +695,156 @@ class TestMemoryContextFencing: fence_end = combined.index("") assert "Alice" in combined[fence_start:fence_end] assert combined.index("weather") < fence_start + + +# --------------------------------------------------------------------------- +# MemoryManager.restart_session() tests +# --------------------------------------------------------------------------- + + +class ResettableProvider(FakeMemoryProvider): + """Provider that implements reset_session() for cheap session transitions.""" + + def __init__(self, name="resettable"): + super().__init__(name) + self.reset_session_calls = [] + + def reset_session(self, new_session_id: str) -> None: + self.reset_session_calls.append(new_session_id) + + +class TestMemoryManagerRestartSession: + def test_restart_calls_reset_session_on_external(self): + """restart_session() calls reset_session() on external providers that have it.""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + external = ResettableProvider("openviking") + mgr.add_provider(builtin) + mgr.add_provider(external) + + mgr.restart_session("new-session-123") + + assert external.reset_session_calls == ["new-session-123"] + # builtin is skipped — it has no per-session state + assert not hasattr(builtin, "reset_session_calls") + + def test_restart_skips_builtin(self): + """restart_session() does not call anything on the builtin provider.""" + mgr = MemoryManager() + builtin = ResettableProvider("builtin") + mgr.add_provider(builtin) + + mgr.restart_session("new-session-456") + + assert builtin.reset_session_calls == [] + + def test_restart_falls_back_to_initialize(self): + """restart_session() calls initialize() when provider has no reset_session().""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + external = FakeMemoryProvider("honcho") + mgr.add_provider(builtin) + mgr.add_provider(external) + + mgr.restart_session("fallback-session") + + assert external.initialized + assert external._init_kwargs["session_id"] == "fallback-session" + + def test_restart_tolerates_provider_failure(self): + """restart_session() swallows failures so other providers are still called.""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + bad = ResettableProvider("bad-provider") + + def _explode(new_sid): + raise RuntimeError("network error") + + bad.reset_session = _explode + good = ResettableProvider("good-provider") + # Register bad provider first, but only one external is allowed — + # so test both providers by using the fallback path. + mgr.add_provider(builtin) + mgr.add_provider(bad) + + # Calling restart_session should not raise even though the provider fails. + mgr.restart_session("safe-session") + + def test_restart_no_providers_is_noop(self): + """restart_session() on an empty manager does not raise.""" + mgr = MemoryManager() + mgr.restart_session("empty-session") # must not raise + + +# --------------------------------------------------------------------------- +# OpenVikingMemoryProvider.reset_session() tests +# --------------------------------------------------------------------------- + + +class TestOpenVikingResetSession: + """Unit tests for the cheap session-transition path in the OV plugin.""" + + def _make_provider(self): + """Return an OpenVikingMemoryProvider with a mock _client.""" + try: + from plugins.memory.openviking import OpenVikingMemoryProvider + except ImportError: + pytest.skip("openviking plugin not importable") + + provider = OpenVikingMemoryProvider() + provider._session_id = "old-session" + provider._turn_count = 5 + provider._prefetch_result = "cached result" + provider._sync_thread = None + provider._prefetch_thread = None + + mock_client = MagicMock() + mock_client.post.return_value = {} + provider._client = mock_client + return provider, mock_client + + def test_reset_updates_session_id(self): + provider, _ = self._make_provider() + provider.reset_session("new-session-abc") + assert provider._session_id == "new-session-abc" + + def test_reset_clears_per_session_state(self): + provider, _ = self._make_provider() + provider.reset_session("new-session-xyz") + assert provider._turn_count == 0 + assert provider._prefetch_result == "" + assert provider._sync_thread is None + assert provider._prefetch_thread is None + + def test_reset_creates_new_ov_session(self): + provider, mock_client = self._make_provider() + provider.reset_session("new-session-post") + mock_client.post.assert_called_once_with( + "/api/v1/sessions", {"session_id": "new-session-post"} + ) + + def test_reset_tolerates_ov_api_failure(self): + provider, mock_client = self._make_provider() + mock_client.post.side_effect = RuntimeError("connection refused") + # Must not raise — OV API failure is non-fatal for the reset path + provider.reset_session("no-server-session") + assert provider._session_id == "no-server-session" + + def test_reset_without_client_is_noop(self): + """reset_session() works even if provider was never initialized (no client).""" + try: + from plugins.memory.openviking import OpenVikingMemoryProvider + except ImportError: + pytest.skip("openviking plugin not importable") + + provider = OpenVikingMemoryProvider() + provider._client = None + provider._session_id = "old" + provider._turn_count = 3 + provider._sync_thread = None + provider._prefetch_thread = None + provider._prefetch_result = "" + + provider.reset_session("new-no-client") + assert provider._session_id == "new-no-client" + assert provider._turn_count == 0 From 8275fa597a702116a6a2cf1a9fa194d8874020ad Mon Sep 17 00:00:00 2001 From: "zhiheng.liu" Date: Thu, 16 Apr 2026 00:31:48 +0800 Subject: [PATCH 14/16] refactor(memory): promote on_session_reset to base provider hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace hasattr-forked OpenViking-specific paths with a proper base-class hook. Collapse the two agent wrappers into a single rotate_memory_session so callers don't orchestrate commit + rebind themselves. - MemoryProvider: add on_session_reset(new_session_id) as a default no-op - MemoryManager: on_session_reset fans out unconditionally (no hasattr, no builtin skip — base no-op covers it) - OpenViking: rename reset_session -> on_session_reset; drop the explicit POST /api/v1/sessions (OV auto-creates on first message) and the two debug raise_for_status wrappers - AIAgent: collapse commit_memory_session + reinitialize_memory_session into rotate_memory_session(new_sid, messages) - cli.py / run_agent.py: replace hasattr blocks and the split calls with a single unconditional rotate_memory_session call; compression path now passes the real messages list instead of [] - tests: align with on_session_reset, assert reset does NOT POST /sessions Co-Authored-By: Claude Opus 4.6 --- agent/memory_manager.py | 20 +++---- agent/memory_provider.py | 9 +++ cli.py | 22 ++----- plugins/memory/openviking/__init__.py | 39 ++---------- run_agent.py | 45 ++++---------- tests/agent/test_memory_provider.py | 85 +++++++++------------------ 6 files changed, 68 insertions(+), 152 deletions(-) diff --git a/agent/memory_manager.py b/agent/memory_manager.py index 8320710ce7..b677241594 100644 --- a/agent/memory_manager.py +++ b/agent/memory_manager.py @@ -281,25 +281,19 @@ class MemoryManager: provider.name, e, ) - def restart_session(self, new_session_id: str) -> None: - """Transition external providers to a new session without full teardown. + def on_session_reset(self, new_session_id: str) -> None: + """Notify all providers of a session reset. - Must be called AFTER on_session_end() has committed the old session. - Providers that implement reset_session() are transitioned cheaply - (HTTP client kept alive); others fall back to a full initialize(). - The builtin provider is skipped — it has no per-session state. + Called after on_session_end() has committed the previous session. + Providers with per-session state override on_session_reset to rebind + it cheaply (default is a no-op on the base class). """ for provider in self._providers: - if provider.name == "builtin": - continue try: - if hasattr(provider, "reset_session"): - provider.reset_session(new_session_id) - else: - provider.initialize(session_id=new_session_id) + provider.on_session_reset(new_session_id) except Exception as e: logger.debug( - "Memory provider '%s' restart_session failed: %s", + "Memory provider '%s' on_session_reset failed: %s", provider.name, e, ) diff --git a/agent/memory_provider.py b/agent/memory_provider.py index 24593e3345..9c6f0225cb 100644 --- a/agent/memory_provider.py +++ b/agent/memory_provider.py @@ -160,6 +160,15 @@ class MemoryProvider(ABC): (CLI exit, /reset, gateway session expiry). """ + def on_session_reset(self, new_session_id: str) -> None: + """Transition to a new session without full teardown. + + Called after on_session_end() has committed the previous session + (e.g. /new, context compression). Providers with per-session state + override to rebind counters/IDs while keeping HTTP clients alive. + Default: no-op. + """ + def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Called before context compression discards old messages. diff --git a/cli.py b/cli.py index 94e56b0d5d..a00eaf9702 100644 --- a/cli.py +++ b/cli.py @@ -4095,18 +4095,12 @@ class HermesCLI: def new_session(self, silent=False): """Start a fresh session with a new session ID and cleared agent state.""" - if self.agent and self.conversation_history: + old_history = self.conversation_history + if self.agent and old_history: try: - self.agent.flush_memories(self.conversation_history) + self.agent.flush_memories(old_history) except (Exception, KeyboardInterrupt): pass - # Commit external memory providers (e.g. OpenViking) BEFORE - # session_id changes so extraction runs on the correct session. - if hasattr(self.agent, "commit_memory_session"): - try: - self.agent.commit_memory_session(self.conversation_history) - except Exception: - pass self._notify_session_boundary("on_session_finalize") elif self.agent: # First session or empty history — still finalize the old session @@ -4155,13 +4149,9 @@ class HermesCLI: ) except Exception: pass - # Reinitialize external memory providers with the new session_id - # so subsequent turns are tracked under the new session. - if hasattr(self.agent, "reinitialize_memory_session"): - try: - self.agent.reinitialize_memory_session(self.session_id) - except Exception: - pass + # Commit the old session and rebind memory providers to the + # new session_id so subsequent turns are tracked correctly. + self.agent.rotate_memory_session(self.session_id, old_history) self._notify_session_boundary("on_session_reset") if not silent: diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index b1cb03b733..4251927ccc 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -109,12 +109,7 @@ class _VikingClient: resp = self._httpx.get( self._url(path), headers=self._headers(), timeout=_TIMEOUT, **kwargs ) - try: - resp.raise_for_status() - except Exception as e: - logger.debug("OpenViking request failed: %s %s, status: %s, response: %s", - "GET", path, resp.status_code, resp.text) - raise + resp.raise_for_status() return resp.json() def post(self, path: str, payload: dict = None, **kwargs) -> dict: @@ -122,12 +117,7 @@ class _VikingClient: self._url(path), json=payload or {}, headers=self._headers(), timeout=_TIMEOUT, **kwargs ) - try: - resp.raise_for_status() - except Exception as e: - logger.debug("OpenViking request failed: %s %s, status: %s, response: %s", - "POST", path, resp.status_code, resp.text) - raise + resp.raise_for_status() return resp.json() def health(self) -> bool: @@ -336,13 +326,6 @@ class OpenVikingMemoryProvider(MemoryProvider): if not self._client.health(): logger.warning("OpenViking server at %s is not reachable", self._endpoint) self._client = None - else: - # Explicitly create the session to ensure it exists - try: - self._client.post("/api/v1/sessions", {"session_id": self._session_id}) - logger.info("OpenViking session %s created", self._session_id) - except Exception as e: - logger.debug("OpenViking session creation failed (may already exist): %s", e) except ImportError: logger.warning("httpx not installed — OpenViking plugin disabled") self._client = None @@ -533,14 +516,9 @@ class OpenVikingMemoryProvider(MemoryProvider): except Exception as e: return tool_error(str(e)) - def reset_session(self, new_session_id: str) -> None: - """Transition to a new session without tearing down the HTTP client. - - Called by MemoryManager.restart_session() after on_session_end() has - committed the old session (e.g. after CLI /new or context compression). - Lighter than shutdown() + initialize(): keeps the httpx client alive, - resets per-session counters, and creates the new OV session. - """ + def on_session_reset(self, new_session_id: str) -> None: + """Rebind per-session state to new_session_id. OV auto-creates the + session when the first message is added, so no create call here.""" for t in (self._sync_thread, self._prefetch_thread): if t and t.is_alive(): t.join(timeout=5.0) @@ -551,13 +529,6 @@ class OpenVikingMemoryProvider(MemoryProvider): self._sync_thread = None self._prefetch_thread = None - if self._client: - try: - self._client.post("/api/v1/sessions", {"session_id": self._session_id}) - logger.info("OpenViking new session %s created", self._session_id) - except Exception as e: - logger.debug("OpenViking session creation on reset: %s", e) - global _last_active_provider _last_active_provider = self diff --git a/run_agent.py b/run_agent.py index 773d22bed0..a19857bc4e 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3040,33 +3040,17 @@ class AIAgent: except Exception: pass - def commit_memory_session(self, messages: list = None) -> None: - """Commit external memory providers for the current session. - - Calls on_session_end() WITHOUT shutting down providers — the session - data (e.g. OpenViking) is committed for extraction, but the HTTP - client and provider state remain alive for the next session. - Called before session_id changes (e.g. /new, context compression). - """ - if self._memory_manager: - try: - self._memory_manager.on_session_end(messages or []) - except Exception: - pass - - def reinitialize_memory_session(self, new_session_id: str) -> None: - """Transition memory providers to a new session after commit. - - Calls restart_session() which uses reset_session() on providers that - support it (cheap: keeps HTTP client, resets per-session counters) or - falls back to initialize() for providers that don't. - Called after session_id has been assigned (e.g. /new, compression). - """ - if self._memory_manager: - try: - self._memory_manager.restart_session(new_session_id) - except Exception: - pass + def rotate_memory_session(self, new_session_id: str, messages: list = None) -> None: + """Commit the current memory session, then rebind providers to + new_session_id. Keeps HTTP clients/state alive across the transition. + Called when session_id rotates (e.g. /new, context compression).""" + if not self._memory_manager: + return + try: + self._memory_manager.on_session_end(messages or []) + self._memory_manager.on_session_reset(new_session_id) + except Exception: + pass def close(self) -> None: """Release all resources held by this agent instance. @@ -6854,14 +6838,11 @@ class AIAgent: try: # Propagate title to the new session with auto-numbering old_title = self._session_db.get_session_title(self.session_id) - # Commit external memory (e.g. OpenViking) before session_id - # changes so extraction runs on the correct session. - self.commit_memory_session([]) self._session_db.end_session(self.session_id, "compression") old_session_id = self.session_id self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" - # Transition external memory providers to the new session_id. - self.reinitialize_memory_session(self.session_id) + # Commit the old memory session and rebind providers to the new one. + self.rotate_memory_session(self.session_id, messages) # Update session_log_file to point to the new session's JSON file self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" self._session_db.create_session( diff --git a/tests/agent/test_memory_provider.py b/tests/agent/test_memory_provider.py index afd3dc002f..dc7f4b032a 100644 --- a/tests/agent/test_memory_provider.py +++ b/tests/agent/test_memory_provider.py @@ -698,61 +698,47 @@ class TestMemoryContextFencing: # --------------------------------------------------------------------------- -# MemoryManager.restart_session() tests +# MemoryManager.on_session_reset() tests # --------------------------------------------------------------------------- class ResettableProvider(FakeMemoryProvider): - """Provider that implements reset_session() for cheap session transitions.""" + """Provider that records on_session_reset calls for assertions.""" def __init__(self, name="resettable"): super().__init__(name) self.reset_session_calls = [] - def reset_session(self, new_session_id: str) -> None: + def on_session_reset(self, new_session_id: str) -> None: self.reset_session_calls.append(new_session_id) -class TestMemoryManagerRestartSession: - def test_restart_calls_reset_session_on_external(self): - """restart_session() calls reset_session() on external providers that have it.""" +class TestMemoryManagerOnSessionReset: + def test_fans_out_to_all_providers(self): mgr = MemoryManager() - builtin = FakeMemoryProvider("builtin") + builtin = ResettableProvider("builtin") external = ResettableProvider("openviking") mgr.add_provider(builtin) mgr.add_provider(external) - mgr.restart_session("new-session-123") + mgr.on_session_reset("new-session-123") + assert builtin.reset_session_calls == ["new-session-123"] assert external.reset_session_calls == ["new-session-123"] - # builtin is skipped — it has no per-session state - assert not hasattr(builtin, "reset_session_calls") - def test_restart_skips_builtin(self): - """restart_session() does not call anything on the builtin provider.""" - mgr = MemoryManager() - builtin = ResettableProvider("builtin") - mgr.add_provider(builtin) - - mgr.restart_session("new-session-456") - - assert builtin.reset_session_calls == [] - - def test_restart_falls_back_to_initialize(self): - """restart_session() calls initialize() when provider has no reset_session().""" + def test_base_default_is_noop(self): + """Providers that don't override on_session_reset get the base no-op.""" mgr = MemoryManager() builtin = FakeMemoryProvider("builtin") external = FakeMemoryProvider("honcho") mgr.add_provider(builtin) mgr.add_provider(external) - mgr.restart_session("fallback-session") + # Must not raise — default is a no-op + mgr.on_session_reset("noop-session") + assert not external.initialized - assert external.initialized - assert external._init_kwargs["session_id"] == "fallback-session" - - def test_restart_tolerates_provider_failure(self): - """restart_session() swallows failures so other providers are still called.""" + def test_tolerates_provider_failure(self): mgr = MemoryManager() builtin = FakeMemoryProvider("builtin") bad = ResettableProvider("bad-provider") @@ -760,32 +746,26 @@ class TestMemoryManagerRestartSession: def _explode(new_sid): raise RuntimeError("network error") - bad.reset_session = _explode - good = ResettableProvider("good-provider") - # Register bad provider first, but only one external is allowed — - # so test both providers by using the fallback path. + bad.on_session_reset = _explode mgr.add_provider(builtin) mgr.add_provider(bad) - # Calling restart_session should not raise even though the provider fails. - mgr.restart_session("safe-session") + mgr.on_session_reset("safe-session") # must not raise - def test_restart_no_providers_is_noop(self): - """restart_session() on an empty manager does not raise.""" + def test_no_providers_is_noop(self): mgr = MemoryManager() - mgr.restart_session("empty-session") # must not raise + mgr.on_session_reset("empty-session") # must not raise # --------------------------------------------------------------------------- -# OpenVikingMemoryProvider.reset_session() tests +# OpenVikingMemoryProvider.on_session_reset() tests # --------------------------------------------------------------------------- -class TestOpenVikingResetSession: +class TestOpenVikingOnSessionReset: """Unit tests for the cheap session-transition path in the OV plugin.""" def _make_provider(self): - """Return an OpenVikingMemoryProvider with a mock _client.""" try: from plugins.memory.openviking import OpenVikingMemoryProvider except ImportError: @@ -805,33 +785,24 @@ class TestOpenVikingResetSession: def test_reset_updates_session_id(self): provider, _ = self._make_provider() - provider.reset_session("new-session-abc") + provider.on_session_reset("new-session-abc") assert provider._session_id == "new-session-abc" def test_reset_clears_per_session_state(self): provider, _ = self._make_provider() - provider.reset_session("new-session-xyz") + provider.on_session_reset("new-session-xyz") assert provider._turn_count == 0 assert provider._prefetch_result == "" assert provider._sync_thread is None assert provider._prefetch_thread is None - def test_reset_creates_new_ov_session(self): + def test_reset_does_not_create_ov_session(self): + """OV auto-creates on first message; reset must not POST /sessions.""" provider, mock_client = self._make_provider() - provider.reset_session("new-session-post") - mock_client.post.assert_called_once_with( - "/api/v1/sessions", {"session_id": "new-session-post"} - ) + provider.on_session_reset("new-session-post") + mock_client.post.assert_not_called() - def test_reset_tolerates_ov_api_failure(self): - provider, mock_client = self._make_provider() - mock_client.post.side_effect = RuntimeError("connection refused") - # Must not raise — OV API failure is non-fatal for the reset path - provider.reset_session("no-server-session") - assert provider._session_id == "no-server-session" - - def test_reset_without_client_is_noop(self): - """reset_session() works even if provider was never initialized (no client).""" + def test_reset_without_client_is_safe(self): try: from plugins.memory.openviking import OpenVikingMemoryProvider except ImportError: @@ -845,6 +816,6 @@ class TestOpenVikingResetSession: provider._prefetch_thread = None provider._prefetch_result = "" - provider.reset_session("new-no-client") + provider.on_session_reset("new-no-client") assert provider._session_id == "new-no-client" assert provider._turn_count == 0 From 7cb06e3bb3b4954277f993fa388f81e55f428202 Mon Sep 17 00:00:00 2001 From: "zhiheng.liu" Date: Thu, 16 Apr 2026 00:38:19 +0800 Subject: [PATCH 15/16] =?UTF-8?q?refactor(memory):=20drop=20on=5Fsession?= =?UTF-8?q?=5Freset=20=E2=80=94=20commit-only=20is=20enough?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OV transparently handles message history across /new and /compress: old messages stay in the same session and extraction is idempotent, so there's no need to rebind providers to a new session_id. The only thing the session boundary actually needs is to trigger extraction. - MemoryProvider / MemoryManager: remove on_session_reset hook - OpenViking: remove on_session_reset override (nothing to do) - AIAgent: replace rotate_memory_session with commit_memory_session (just calls on_session_end, no rebind) - cli.py / run_agent.py: single commit_memory_session call at the session boundary before session_id rotates - tests: replace on_session_reset coverage with routing tests for MemoryManager.on_session_end Co-Authored-By: Claude Opus 4.6 --- agent/memory_manager.py | 16 ---- agent/memory_provider.py | 9 -- cli.py | 10 +-- plugins/memory/openviking/__init__.py | 16 ---- run_agent.py | 14 +-- tests/agent/test_memory_provider.py | 121 ++++---------------------- 6 files changed, 30 insertions(+), 156 deletions(-) diff --git a/agent/memory_manager.py b/agent/memory_manager.py index b677241594..6cd1c860b6 100644 --- a/agent/memory_manager.py +++ b/agent/memory_manager.py @@ -281,22 +281,6 @@ class MemoryManager: provider.name, e, ) - def on_session_reset(self, new_session_id: str) -> None: - """Notify all providers of a session reset. - - Called after on_session_end() has committed the previous session. - Providers with per-session state override on_session_reset to rebind - it cheaply (default is a no-op on the base class). - """ - for provider in self._providers: - try: - provider.on_session_reset(new_session_id) - except Exception as e: - logger.debug( - "Memory provider '%s' on_session_reset failed: %s", - provider.name, e, - ) - def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Notify all providers before context compression. diff --git a/agent/memory_provider.py b/agent/memory_provider.py index 9c6f0225cb..24593e3345 100644 --- a/agent/memory_provider.py +++ b/agent/memory_provider.py @@ -160,15 +160,6 @@ class MemoryProvider(ABC): (CLI exit, /reset, gateway session expiry). """ - def on_session_reset(self, new_session_id: str) -> None: - """Transition to a new session without full teardown. - - Called after on_session_end() has committed the previous session - (e.g. /new, context compression). Providers with per-session state - override to rebind counters/IDs while keeping HTTP clients alive. - Default: no-op. - """ - def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Called before context compression discards old messages. diff --git a/cli.py b/cli.py index a00eaf9702..fbc8f85250 100644 --- a/cli.py +++ b/cli.py @@ -4095,12 +4095,13 @@ class HermesCLI: def new_session(self, silent=False): """Start a fresh session with a new session ID and cleared agent state.""" - old_history = self.conversation_history - if self.agent and old_history: + if self.agent and self.conversation_history: try: - self.agent.flush_memories(old_history) + self.agent.flush_memories(self.conversation_history) except (Exception, KeyboardInterrupt): pass + # Trigger memory extraction on the old session before session_id rotates. + self.agent.commit_memory_session(self.conversation_history) self._notify_session_boundary("on_session_finalize") elif self.agent: # First session or empty history — still finalize the old session @@ -4149,9 +4150,6 @@ class HermesCLI: ) except Exception: pass - # Commit the old session and rebind memory providers to the - # new session_id so subsequent turns are tracked correctly. - self.agent.rotate_memory_session(self.session_id, old_history) self._notify_session_boundary("on_session_reset") if not silent: diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 4251927ccc..86d7ad5efb 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -516,22 +516,6 @@ class OpenVikingMemoryProvider(MemoryProvider): except Exception as e: return tool_error(str(e)) - def on_session_reset(self, new_session_id: str) -> None: - """Rebind per-session state to new_session_id. OV auto-creates the - session when the first message is added, so no create call here.""" - for t in (self._sync_thread, self._prefetch_thread): - if t and t.is_alive(): - t.join(timeout=5.0) - - self._session_id = new_session_id - self._turn_count = 0 - self._prefetch_result = "" - self._sync_thread = None - self._prefetch_thread = None - - global _last_active_provider - _last_active_provider = self - def shutdown(self) -> None: # Wait for background threads to finish for t in (self._sync_thread, self._prefetch_thread): diff --git a/run_agent.py b/run_agent.py index a19857bc4e..d7d1249be9 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3040,15 +3040,15 @@ class AIAgent: except Exception: pass - def rotate_memory_session(self, new_session_id: str, messages: list = None) -> None: - """Commit the current memory session, then rebind providers to - new_session_id. Keeps HTTP clients/state alive across the transition. - Called when session_id rotates (e.g. /new, context compression).""" + def commit_memory_session(self, messages: list = None) -> None: + """Trigger end-of-session extraction without tearing providers down. + Called when session_id rotates (e.g. /new, context compression); + providers keep their state and continue running under the old + session_id — they just flush pending extraction now.""" if not self._memory_manager: return try: self._memory_manager.on_session_end(messages or []) - self._memory_manager.on_session_reset(new_session_id) except Exception: pass @@ -6838,11 +6838,11 @@ class AIAgent: try: # Propagate title to the new session with auto-numbering old_title = self._session_db.get_session_title(self.session_id) + # Trigger memory extraction on the old session before it rotates. + self.commit_memory_session(messages) self._session_db.end_session(self.session_id, "compression") old_session_id = self.session_id self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" - # Commit the old memory session and rebind providers to the new one. - self.rotate_memory_session(self.session_id, messages) # Update session_log_file to point to the new session's JSON file self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" self._session_db.create_session( diff --git a/tests/agent/test_memory_provider.py b/tests/agent/test_memory_provider.py index dc7f4b032a..505f40bd55 100644 --- a/tests/agent/test_memory_provider.py +++ b/tests/agent/test_memory_provider.py @@ -698,124 +698,41 @@ class TestMemoryContextFencing: # --------------------------------------------------------------------------- -# MemoryManager.on_session_reset() tests +# AIAgent.commit_memory_session — routes to MemoryManager.on_session_end # --------------------------------------------------------------------------- -class ResettableProvider(FakeMemoryProvider): - """Provider that records on_session_reset calls for assertions.""" +class _CommitRecorder(FakeMemoryProvider): + """Provider that records on_session_end calls for assertions.""" - def __init__(self, name="resettable"): + def __init__(self, name="recorder"): super().__init__(name) - self.reset_session_calls = [] + self.end_calls = [] - def on_session_reset(self, new_session_id: str) -> None: - self.reset_session_calls.append(new_session_id) + def on_session_end(self, messages): + self.end_calls.append(list(messages or [])) -class TestMemoryManagerOnSessionReset: - def test_fans_out_to_all_providers(self): +class TestCommitMemorySessionRouting: + def test_on_session_end_fans_out(self): mgr = MemoryManager() - builtin = ResettableProvider("builtin") - external = ResettableProvider("openviking") + builtin = _CommitRecorder("builtin") + external = _CommitRecorder("openviking") mgr.add_provider(builtin) mgr.add_provider(external) - mgr.on_session_reset("new-session-123") + msgs = [{"role": "user", "content": "hi"}] + mgr.on_session_end(msgs) - assert builtin.reset_session_calls == ["new-session-123"] - assert external.reset_session_calls == ["new-session-123"] + assert builtin.end_calls == [msgs] + assert external.end_calls == [msgs] - def test_base_default_is_noop(self): - """Providers that don't override on_session_reset get the base no-op.""" + def test_on_session_end_tolerates_failure(self): mgr = MemoryManager() builtin = FakeMemoryProvider("builtin") - external = FakeMemoryProvider("honcho") - mgr.add_provider(builtin) - mgr.add_provider(external) - - # Must not raise — default is a no-op - mgr.on_session_reset("noop-session") - assert not external.initialized - - def test_tolerates_provider_failure(self): - mgr = MemoryManager() - builtin = FakeMemoryProvider("builtin") - bad = ResettableProvider("bad-provider") - - def _explode(new_sid): - raise RuntimeError("network error") - - bad.on_session_reset = _explode + bad = _CommitRecorder("bad-provider") + bad.on_session_end = lambda m: (_ for _ in ()).throw(RuntimeError("boom")) mgr.add_provider(builtin) mgr.add_provider(bad) - mgr.on_session_reset("safe-session") # must not raise - - def test_no_providers_is_noop(self): - mgr = MemoryManager() - mgr.on_session_reset("empty-session") # must not raise - - -# --------------------------------------------------------------------------- -# OpenVikingMemoryProvider.on_session_reset() tests -# --------------------------------------------------------------------------- - - -class TestOpenVikingOnSessionReset: - """Unit tests for the cheap session-transition path in the OV plugin.""" - - def _make_provider(self): - try: - from plugins.memory.openviking import OpenVikingMemoryProvider - except ImportError: - pytest.skip("openviking plugin not importable") - - provider = OpenVikingMemoryProvider() - provider._session_id = "old-session" - provider._turn_count = 5 - provider._prefetch_result = "cached result" - provider._sync_thread = None - provider._prefetch_thread = None - - mock_client = MagicMock() - mock_client.post.return_value = {} - provider._client = mock_client - return provider, mock_client - - def test_reset_updates_session_id(self): - provider, _ = self._make_provider() - provider.on_session_reset("new-session-abc") - assert provider._session_id == "new-session-abc" - - def test_reset_clears_per_session_state(self): - provider, _ = self._make_provider() - provider.on_session_reset("new-session-xyz") - assert provider._turn_count == 0 - assert provider._prefetch_result == "" - assert provider._sync_thread is None - assert provider._prefetch_thread is None - - def test_reset_does_not_create_ov_session(self): - """OV auto-creates on first message; reset must not POST /sessions.""" - provider, mock_client = self._make_provider() - provider.on_session_reset("new-session-post") - mock_client.post.assert_not_called() - - def test_reset_without_client_is_safe(self): - try: - from plugins.memory.openviking import OpenVikingMemoryProvider - except ImportError: - pytest.skip("openviking plugin not importable") - - provider = OpenVikingMemoryProvider() - provider._client = None - provider._session_id = "old" - provider._turn_count = 3 - provider._sync_thread = None - provider._prefetch_thread = None - provider._prefetch_result = "" - - provider.on_session_reset("new-no-client") - assert provider._session_id == "new-no-client" - assert provider._turn_count == 0 + mgr.on_session_end([]) # must not raise From d1d425e9d0e0e37bc0855fe5a4142bac86a73b0d Mon Sep 17 00:00:00 2001 From: Teknium Date: Wed, 15 Apr 2026 11:08:14 -0700 Subject: [PATCH 16/16] chore: add ZaynJarvis bytedance email to AUTHOR_MAP --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index b875eb8a54..73d663e55a 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -198,6 +198,7 @@ AUTHOR_MAP = { "zqiao@microsoft.com": "tomqiaozc", "zzn+pa@zzn.im": "xinbenlv", "zaynjarvis@gmail.com": "ZaynJarvis", + "zhiheng.liu@bytedance.com": "ZaynJarvis", }