From 7856d304f20303617453016ed9e818c729f6ee97 Mon Sep 17 00:00:00 2001 From: "zhiheng.liu" Date: Wed, 15 Apr 2026 23:14:32 +0800 Subject: [PATCH] fix(openviking): commit session on /new and context compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OpenViking memory provider extracts memories when its session is committed (POST /api/v1/sessions/{id}/commit). Before this fix, the CLI had two code paths that changed the active session_id without ever committing the outgoing OpenViking session: 1. /new (new_session() in cli.py) — called flush_memories() to write MEMORY.md, then immediately discarded the old session_id. The accumulated OpenViking session was never committed, so all context from that session was lost before extraction could run. 2. /compress and auto-compress (_compress_context() in run_agent.py) — split the SQLite session (new session_id) but left the OpenViking provider pointing at the old session_id with no commit, meaning all messages synced to OpenViking were silently orphaned. The gateway already handles session commit on /new and /reset via shutdown_memory_provider() on the cached agent; the CLI path did not. Fix: introduce a lightweight session-transition lifecycle alongside the existing full shutdown path: - OpenVikingMemoryProvider.reset_session(new_session_id): waits for in-flight background threads, resets per-session counters, and creates the new OV session via POST /api/v1/sessions — without tearing down the HTTP client (avoids connection overhead on /new). - MemoryManager.restart_session(new_session_id): calls reset_session() on providers that implement it; falls back to initialize() for providers that do not. Skips the builtin provider (no per-session state). - AIAgent.commit_memory_session(messages): wraps memory_manager.on_session_end() without shutdown — commits OV session for extraction but leaves the provider alive for the next session. - AIAgent.reinitialize_memory_session(new_session_id): wraps memory_manager.restart_session() — transitions all external providers to the new session after session_id has been assigned. Call sites: - cli.py new_session(): commit BEFORE session_id changes, reinitialize AFTER — ensuring OV extraction runs on the correct session and the new session is immediately ready for the next turn. - run_agent._compress_context(): same pattern, inside the if self._session_db: block where the session_id split happens. /compress and auto-compress are functionally identical at this layer: both call _compress_context(), so both are fixed by the same change. Tests added to tests/agent/test_memory_provider.py: - TestMemoryManagerRestartSession: reset_session() routing, builtin skip, initialize() fallback, failure tolerance, empty-manager noop. - TestOpenVikingResetSession: session_id update, per-session state clear, POST /api/v1/sessions call, API failure tolerance, no-client noop. Co-Authored-By: Claude Sonnet 4.6 --- agent/memory_manager.py | 22 ++++ cli.py | 14 +++ plugins/memory/openviking/__init__.py | 28 +++++ run_agent.py | 33 ++++++ tests/agent/test_memory_provider.py | 153 ++++++++++++++++++++++++++ 5 files changed, 250 insertions(+) diff --git a/agent/memory_manager.py b/agent/memory_manager.py index 6cd1c860b..8320710ce 100644 --- a/agent/memory_manager.py +++ b/agent/memory_manager.py @@ -281,6 +281,28 @@ class MemoryManager: provider.name, e, ) + def restart_session(self, new_session_id: str) -> None: + """Transition external providers to a new session without full teardown. + + Must be called AFTER on_session_end() has committed the old session. + Providers that implement reset_session() are transitioned cheaply + (HTTP client kept alive); others fall back to a full initialize(). + The builtin provider is skipped — it has no per-session state. + """ + for provider in self._providers: + if provider.name == "builtin": + continue + try: + if hasattr(provider, "reset_session"): + provider.reset_session(new_session_id) + else: + provider.initialize(session_id=new_session_id) + except Exception as e: + logger.debug( + "Memory provider '%s' restart_session failed: %s", + provider.name, e, + ) + def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Notify all providers before context compression. diff --git a/cli.py b/cli.py index 97698f133..94e56b0d5 100644 --- a/cli.py +++ b/cli.py @@ -4100,6 +4100,13 @@ class HermesCLI: self.agent.flush_memories(self.conversation_history) except (Exception, KeyboardInterrupt): pass + # Commit external memory providers (e.g. OpenViking) BEFORE + # session_id changes so extraction runs on the correct session. + if hasattr(self.agent, "commit_memory_session"): + try: + self.agent.commit_memory_session(self.conversation_history) + except Exception: + pass self._notify_session_boundary("on_session_finalize") elif self.agent: # First session or empty history — still finalize the old session @@ -4148,6 +4155,13 @@ class HermesCLI: ) except Exception: pass + # Reinitialize external memory providers with the new session_id + # so subsequent turns are tracked under the new session. + if hasattr(self.agent, "reinitialize_memory_session"): + try: + self.agent.reinitialize_memory_session(self.session_id) + except Exception: + pass self._notify_session_boundary("on_session_reset") if not silent: diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 72ec3b105..b1cb03b73 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -533,6 +533,34 @@ class OpenVikingMemoryProvider(MemoryProvider): except Exception as e: return tool_error(str(e)) + def reset_session(self, new_session_id: str) -> None: + """Transition to a new session without tearing down the HTTP client. + + Called by MemoryManager.restart_session() after on_session_end() has + committed the old session (e.g. after CLI /new or context compression). + Lighter than shutdown() + initialize(): keeps the httpx client alive, + resets per-session counters, and creates the new OV session. + """ + for t in (self._sync_thread, self._prefetch_thread): + if t and t.is_alive(): + t.join(timeout=5.0) + + self._session_id = new_session_id + self._turn_count = 0 + self._prefetch_result = "" + self._sync_thread = None + self._prefetch_thread = None + + if self._client: + try: + self._client.post("/api/v1/sessions", {"session_id": self._session_id}) + logger.info("OpenViking new session %s created", self._session_id) + except Exception as e: + logger.debug("OpenViking session creation on reset: %s", e) + + global _last_active_provider + _last_active_provider = self + def shutdown(self) -> None: # Wait for background threads to finish for t in (self._sync_thread, self._prefetch_thread): diff --git a/run_agent.py b/run_agent.py index efaeba829..773d22bed 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3040,6 +3040,34 @@ class AIAgent: except Exception: pass + def commit_memory_session(self, messages: list = None) -> None: + """Commit external memory providers for the current session. + + Calls on_session_end() WITHOUT shutting down providers — the session + data (e.g. OpenViking) is committed for extraction, but the HTTP + client and provider state remain alive for the next session. + Called before session_id changes (e.g. /new, context compression). + """ + if self._memory_manager: + try: + self._memory_manager.on_session_end(messages or []) + except Exception: + pass + + def reinitialize_memory_session(self, new_session_id: str) -> None: + """Transition memory providers to a new session after commit. + + Calls restart_session() which uses reset_session() on providers that + support it (cheap: keeps HTTP client, resets per-session counters) or + falls back to initialize() for providers that don't. + Called after session_id has been assigned (e.g. /new, compression). + """ + if self._memory_manager: + try: + self._memory_manager.restart_session(new_session_id) + except Exception: + pass + def close(self) -> None: """Release all resources held by this agent instance. @@ -6826,9 +6854,14 @@ class AIAgent: try: # Propagate title to the new session with auto-numbering old_title = self._session_db.get_session_title(self.session_id) + # Commit external memory (e.g. OpenViking) before session_id + # changes so extraction runs on the correct session. + self.commit_memory_session([]) self._session_db.end_session(self.session_id, "compression") old_session_id = self.session_id self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" + # Transition external memory providers to the new session_id. + self.reinitialize_memory_session(self.session_id) # Update session_log_file to point to the new session's JSON file self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" self._session_db.create_session( diff --git a/tests/agent/test_memory_provider.py b/tests/agent/test_memory_provider.py index fe04e0dd4..afd3dc002 100644 --- a/tests/agent/test_memory_provider.py +++ b/tests/agent/test_memory_provider.py @@ -695,3 +695,156 @@ class TestMemoryContextFencing: fence_end = combined.index("") assert "Alice" in combined[fence_start:fence_end] assert combined.index("weather") < fence_start + + +# --------------------------------------------------------------------------- +# MemoryManager.restart_session() tests +# --------------------------------------------------------------------------- + + +class ResettableProvider(FakeMemoryProvider): + """Provider that implements reset_session() for cheap session transitions.""" + + def __init__(self, name="resettable"): + super().__init__(name) + self.reset_session_calls = [] + + def reset_session(self, new_session_id: str) -> None: + self.reset_session_calls.append(new_session_id) + + +class TestMemoryManagerRestartSession: + def test_restart_calls_reset_session_on_external(self): + """restart_session() calls reset_session() on external providers that have it.""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + external = ResettableProvider("openviking") + mgr.add_provider(builtin) + mgr.add_provider(external) + + mgr.restart_session("new-session-123") + + assert external.reset_session_calls == ["new-session-123"] + # builtin is skipped — it has no per-session state + assert not hasattr(builtin, "reset_session_calls") + + def test_restart_skips_builtin(self): + """restart_session() does not call anything on the builtin provider.""" + mgr = MemoryManager() + builtin = ResettableProvider("builtin") + mgr.add_provider(builtin) + + mgr.restart_session("new-session-456") + + assert builtin.reset_session_calls == [] + + def test_restart_falls_back_to_initialize(self): + """restart_session() calls initialize() when provider has no reset_session().""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + external = FakeMemoryProvider("honcho") + mgr.add_provider(builtin) + mgr.add_provider(external) + + mgr.restart_session("fallback-session") + + assert external.initialized + assert external._init_kwargs["session_id"] == "fallback-session" + + def test_restart_tolerates_provider_failure(self): + """restart_session() swallows failures so other providers are still called.""" + mgr = MemoryManager() + builtin = FakeMemoryProvider("builtin") + bad = ResettableProvider("bad-provider") + + def _explode(new_sid): + raise RuntimeError("network error") + + bad.reset_session = _explode + good = ResettableProvider("good-provider") + # Register bad provider first, but only one external is allowed — + # so test both providers by using the fallback path. + mgr.add_provider(builtin) + mgr.add_provider(bad) + + # Calling restart_session should not raise even though the provider fails. + mgr.restart_session("safe-session") + + def test_restart_no_providers_is_noop(self): + """restart_session() on an empty manager does not raise.""" + mgr = MemoryManager() + mgr.restart_session("empty-session") # must not raise + + +# --------------------------------------------------------------------------- +# OpenVikingMemoryProvider.reset_session() tests +# --------------------------------------------------------------------------- + + +class TestOpenVikingResetSession: + """Unit tests for the cheap session-transition path in the OV plugin.""" + + def _make_provider(self): + """Return an OpenVikingMemoryProvider with a mock _client.""" + try: + from plugins.memory.openviking import OpenVikingMemoryProvider + except ImportError: + pytest.skip("openviking plugin not importable") + + provider = OpenVikingMemoryProvider() + provider._session_id = "old-session" + provider._turn_count = 5 + provider._prefetch_result = "cached result" + provider._sync_thread = None + provider._prefetch_thread = None + + mock_client = MagicMock() + mock_client.post.return_value = {} + provider._client = mock_client + return provider, mock_client + + def test_reset_updates_session_id(self): + provider, _ = self._make_provider() + provider.reset_session("new-session-abc") + assert provider._session_id == "new-session-abc" + + def test_reset_clears_per_session_state(self): + provider, _ = self._make_provider() + provider.reset_session("new-session-xyz") + assert provider._turn_count == 0 + assert provider._prefetch_result == "" + assert provider._sync_thread is None + assert provider._prefetch_thread is None + + def test_reset_creates_new_ov_session(self): + provider, mock_client = self._make_provider() + provider.reset_session("new-session-post") + mock_client.post.assert_called_once_with( + "/api/v1/sessions", {"session_id": "new-session-post"} + ) + + def test_reset_tolerates_ov_api_failure(self): + provider, mock_client = self._make_provider() + mock_client.post.side_effect = RuntimeError("connection refused") + # Must not raise — OV API failure is non-fatal for the reset path + provider.reset_session("no-server-session") + assert provider._session_id == "no-server-session" + + def test_reset_without_client_is_noop(self): + """reset_session() works even if provider was never initialized (no client).""" + try: + from plugins.memory.openviking import OpenVikingMemoryProvider + except ImportError: + pytest.skip("openviking plugin not importable") + + provider = OpenVikingMemoryProvider() + provider._client = None + provider._session_id = "old" + provider._turn_count = 3 + provider._sync_thread = None + provider._prefetch_thread = None + provider._prefetch_result = "" + + provider.reset_session("new-no-client") + assert provider._session_id == "new-no-client" + assert provider._turn_count == 0