mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(openviking): commit session on /new and context compression
The OpenViking memory provider extracts memories when its session is
committed (POST /api/v1/sessions/{id}/commit). Before this fix, the
CLI had two code paths that changed the active session_id without ever
committing the outgoing OpenViking session:
1. /new (new_session() in cli.py) — called flush_memories() to write
MEMORY.md, then immediately discarded the old session_id. The
accumulated OpenViking session was never committed, so all context
from that session was lost before extraction could run.
2. /compress and auto-compress (_compress_context() in run_agent.py) —
split the SQLite session (new session_id) but left the OpenViking
provider pointing at the old session_id with no commit, meaning all
messages synced to OpenViking were silently orphaned.
The gateway already handles session commit on /new and /reset via
shutdown_memory_provider() on the cached agent; the CLI path did not.
Fix: introduce a lightweight session-transition lifecycle alongside
the existing full shutdown path:
- OpenVikingMemoryProvider.reset_session(new_session_id): waits for
in-flight background threads, resets per-session counters, and
creates the new OV session via POST /api/v1/sessions — without
tearing down the HTTP client (avoids connection overhead on /new).
- MemoryManager.restart_session(new_session_id): calls reset_session()
on providers that implement it; falls back to initialize() for
providers that do not. Skips the builtin provider (no per-session
state).
- AIAgent.commit_memory_session(messages): wraps
memory_manager.on_session_end() without shutdown — commits OV session
for extraction but leaves the provider alive for the next session.
- AIAgent.reinitialize_memory_session(new_session_id): wraps
memory_manager.restart_session() — transitions all external providers
to the new session after session_id has been assigned.
Call sites:
- cli.py new_session(): commit BEFORE session_id changes, reinitialize
AFTER — ensuring OV extraction runs on the correct session and the
new session is immediately ready for the next turn.
- run_agent._compress_context(): same pattern, inside the
if self._session_db: block where the session_id split happens.
/compress and auto-compress are functionally identical at this layer:
both call _compress_context(), so both are fixed by the same change.
Tests added to tests/agent/test_memory_provider.py:
- TestMemoryManagerRestartSession: reset_session() routing, builtin
skip, initialize() fallback, failure tolerance, empty-manager noop.
- TestOpenVikingResetSession: session_id update, per-session state
clear, POST /api/v1/sessions call, API failure tolerance, no-client
noop.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
f3ec4b3a16
commit
7856d304f2
5 changed files with 250 additions and 0 deletions
|
|
@ -281,6 +281,28 @@ class MemoryManager:
|
|||
provider.name, e,
|
||||
)
|
||||
|
||||
def restart_session(self, new_session_id: str) -> None:
|
||||
"""Transition external providers to a new session without full teardown.
|
||||
|
||||
Must be called AFTER on_session_end() has committed the old session.
|
||||
Providers that implement reset_session() are transitioned cheaply
|
||||
(HTTP client kept alive); others fall back to a full initialize().
|
||||
The builtin provider is skipped — it has no per-session state.
|
||||
"""
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin":
|
||||
continue
|
||||
try:
|
||||
if hasattr(provider, "reset_session"):
|
||||
provider.reset_session(new_session_id)
|
||||
else:
|
||||
provider.initialize(session_id=new_session_id)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Memory provider '%s' restart_session failed: %s",
|
||||
provider.name, e,
|
||||
)
|
||||
|
||||
def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str:
|
||||
"""Notify all providers before context compression.
|
||||
|
||||
|
|
|
|||
14
cli.py
14
cli.py
|
|
@ -4100,6 +4100,13 @@ class HermesCLI:
|
|||
self.agent.flush_memories(self.conversation_history)
|
||||
except (Exception, KeyboardInterrupt):
|
||||
pass
|
||||
# Commit external memory providers (e.g. OpenViking) BEFORE
|
||||
# session_id changes so extraction runs on the correct session.
|
||||
if hasattr(self.agent, "commit_memory_session"):
|
||||
try:
|
||||
self.agent.commit_memory_session(self.conversation_history)
|
||||
except Exception:
|
||||
pass
|
||||
self._notify_session_boundary("on_session_finalize")
|
||||
elif self.agent:
|
||||
# First session or empty history — still finalize the old session
|
||||
|
|
@ -4148,6 +4155,13 @@ class HermesCLI:
|
|||
)
|
||||
except Exception:
|
||||
pass
|
||||
# Reinitialize external memory providers with the new session_id
|
||||
# so subsequent turns are tracked under the new session.
|
||||
if hasattr(self.agent, "reinitialize_memory_session"):
|
||||
try:
|
||||
self.agent.reinitialize_memory_session(self.session_id)
|
||||
except Exception:
|
||||
pass
|
||||
self._notify_session_boundary("on_session_reset")
|
||||
|
||||
if not silent:
|
||||
|
|
|
|||
|
|
@ -533,6 +533,34 @@ class OpenVikingMemoryProvider(MemoryProvider):
|
|||
except Exception as e:
|
||||
return tool_error(str(e))
|
||||
|
||||
def reset_session(self, new_session_id: str) -> None:
|
||||
"""Transition to a new session without tearing down the HTTP client.
|
||||
|
||||
Called by MemoryManager.restart_session() after on_session_end() has
|
||||
committed the old session (e.g. after CLI /new or context compression).
|
||||
Lighter than shutdown() + initialize(): keeps the httpx client alive,
|
||||
resets per-session counters, and creates the new OV session.
|
||||
"""
|
||||
for t in (self._sync_thread, self._prefetch_thread):
|
||||
if t and t.is_alive():
|
||||
t.join(timeout=5.0)
|
||||
|
||||
self._session_id = new_session_id
|
||||
self._turn_count = 0
|
||||
self._prefetch_result = ""
|
||||
self._sync_thread = None
|
||||
self._prefetch_thread = None
|
||||
|
||||
if self._client:
|
||||
try:
|
||||
self._client.post("/api/v1/sessions", {"session_id": self._session_id})
|
||||
logger.info("OpenViking new session %s created", self._session_id)
|
||||
except Exception as e:
|
||||
logger.debug("OpenViking session creation on reset: %s", e)
|
||||
|
||||
global _last_active_provider
|
||||
_last_active_provider = self
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# Wait for background threads to finish
|
||||
for t in (self._sync_thread, self._prefetch_thread):
|
||||
|
|
|
|||
33
run_agent.py
33
run_agent.py
|
|
@ -3040,6 +3040,34 @@ class AIAgent:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
def commit_memory_session(self, messages: list = None) -> None:
|
||||
"""Commit external memory providers for the current session.
|
||||
|
||||
Calls on_session_end() WITHOUT shutting down providers — the session
|
||||
data (e.g. OpenViking) is committed for extraction, but the HTTP
|
||||
client and provider state remain alive for the next session.
|
||||
Called before session_id changes (e.g. /new, context compression).
|
||||
"""
|
||||
if self._memory_manager:
|
||||
try:
|
||||
self._memory_manager.on_session_end(messages or [])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def reinitialize_memory_session(self, new_session_id: str) -> None:
|
||||
"""Transition memory providers to a new session after commit.
|
||||
|
||||
Calls restart_session() which uses reset_session() on providers that
|
||||
support it (cheap: keeps HTTP client, resets per-session counters) or
|
||||
falls back to initialize() for providers that don't.
|
||||
Called after session_id has been assigned (e.g. /new, compression).
|
||||
"""
|
||||
if self._memory_manager:
|
||||
try:
|
||||
self._memory_manager.restart_session(new_session_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def close(self) -> None:
|
||||
"""Release all resources held by this agent instance.
|
||||
|
||||
|
|
@ -6826,9 +6854,14 @@ class AIAgent:
|
|||
try:
|
||||
# Propagate title to the new session with auto-numbering
|
||||
old_title = self._session_db.get_session_title(self.session_id)
|
||||
# Commit external memory (e.g. OpenViking) before session_id
|
||||
# changes so extraction runs on the correct session.
|
||||
self.commit_memory_session([])
|
||||
self._session_db.end_session(self.session_id, "compression")
|
||||
old_session_id = self.session_id
|
||||
self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
# Transition external memory providers to the new session_id.
|
||||
self.reinitialize_memory_session(self.session_id)
|
||||
# Update session_log_file to point to the new session's JSON file
|
||||
self.session_log_file = self.logs_dir / f"session_{self.session_id}.json"
|
||||
self._session_db.create_session(
|
||||
|
|
|
|||
|
|
@ -695,3 +695,156 @@ class TestMemoryContextFencing:
|
|||
fence_end = combined.index("</memory-context>")
|
||||
assert "Alice" in combined[fence_start:fence_end]
|
||||
assert combined.index("weather") < fence_start
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MemoryManager.restart_session() tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ResettableProvider(FakeMemoryProvider):
|
||||
"""Provider that implements reset_session() for cheap session transitions."""
|
||||
|
||||
def __init__(self, name="resettable"):
|
||||
super().__init__(name)
|
||||
self.reset_session_calls = []
|
||||
|
||||
def reset_session(self, new_session_id: str) -> None:
|
||||
self.reset_session_calls.append(new_session_id)
|
||||
|
||||
|
||||
class TestMemoryManagerRestartSession:
|
||||
def test_restart_calls_reset_session_on_external(self):
|
||||
"""restart_session() calls reset_session() on external providers that have it."""
|
||||
mgr = MemoryManager()
|
||||
builtin = FakeMemoryProvider("builtin")
|
||||
external = ResettableProvider("openviking")
|
||||
mgr.add_provider(builtin)
|
||||
mgr.add_provider(external)
|
||||
|
||||
mgr.restart_session("new-session-123")
|
||||
|
||||
assert external.reset_session_calls == ["new-session-123"]
|
||||
# builtin is skipped — it has no per-session state
|
||||
assert not hasattr(builtin, "reset_session_calls")
|
||||
|
||||
def test_restart_skips_builtin(self):
|
||||
"""restart_session() does not call anything on the builtin provider."""
|
||||
mgr = MemoryManager()
|
||||
builtin = ResettableProvider("builtin")
|
||||
mgr.add_provider(builtin)
|
||||
|
||||
mgr.restart_session("new-session-456")
|
||||
|
||||
assert builtin.reset_session_calls == []
|
||||
|
||||
def test_restart_falls_back_to_initialize(self):
|
||||
"""restart_session() calls initialize() when provider has no reset_session()."""
|
||||
mgr = MemoryManager()
|
||||
builtin = FakeMemoryProvider("builtin")
|
||||
external = FakeMemoryProvider("honcho")
|
||||
mgr.add_provider(builtin)
|
||||
mgr.add_provider(external)
|
||||
|
||||
mgr.restart_session("fallback-session")
|
||||
|
||||
assert external.initialized
|
||||
assert external._init_kwargs["session_id"] == "fallback-session"
|
||||
|
||||
def test_restart_tolerates_provider_failure(self):
|
||||
"""restart_session() swallows failures so other providers are still called."""
|
||||
mgr = MemoryManager()
|
||||
builtin = FakeMemoryProvider("builtin")
|
||||
bad = ResettableProvider("bad-provider")
|
||||
|
||||
def _explode(new_sid):
|
||||
raise RuntimeError("network error")
|
||||
|
||||
bad.reset_session = _explode
|
||||
good = ResettableProvider("good-provider")
|
||||
# Register bad provider first, but only one external is allowed —
|
||||
# so test both providers by using the fallback path.
|
||||
mgr.add_provider(builtin)
|
||||
mgr.add_provider(bad)
|
||||
|
||||
# Calling restart_session should not raise even though the provider fails.
|
||||
mgr.restart_session("safe-session")
|
||||
|
||||
def test_restart_no_providers_is_noop(self):
|
||||
"""restart_session() on an empty manager does not raise."""
|
||||
mgr = MemoryManager()
|
||||
mgr.restart_session("empty-session") # must not raise
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OpenVikingMemoryProvider.reset_session() tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOpenVikingResetSession:
|
||||
"""Unit tests for the cheap session-transition path in the OV plugin."""
|
||||
|
||||
def _make_provider(self):
|
||||
"""Return an OpenVikingMemoryProvider with a mock _client."""
|
||||
try:
|
||||
from plugins.memory.openviking import OpenVikingMemoryProvider
|
||||
except ImportError:
|
||||
pytest.skip("openviking plugin not importable")
|
||||
|
||||
provider = OpenVikingMemoryProvider()
|
||||
provider._session_id = "old-session"
|
||||
provider._turn_count = 5
|
||||
provider._prefetch_result = "cached result"
|
||||
provider._sync_thread = None
|
||||
provider._prefetch_thread = None
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.post.return_value = {}
|
||||
provider._client = mock_client
|
||||
return provider, mock_client
|
||||
|
||||
def test_reset_updates_session_id(self):
|
||||
provider, _ = self._make_provider()
|
||||
provider.reset_session("new-session-abc")
|
||||
assert provider._session_id == "new-session-abc"
|
||||
|
||||
def test_reset_clears_per_session_state(self):
|
||||
provider, _ = self._make_provider()
|
||||
provider.reset_session("new-session-xyz")
|
||||
assert provider._turn_count == 0
|
||||
assert provider._prefetch_result == ""
|
||||
assert provider._sync_thread is None
|
||||
assert provider._prefetch_thread is None
|
||||
|
||||
def test_reset_creates_new_ov_session(self):
|
||||
provider, mock_client = self._make_provider()
|
||||
provider.reset_session("new-session-post")
|
||||
mock_client.post.assert_called_once_with(
|
||||
"/api/v1/sessions", {"session_id": "new-session-post"}
|
||||
)
|
||||
|
||||
def test_reset_tolerates_ov_api_failure(self):
|
||||
provider, mock_client = self._make_provider()
|
||||
mock_client.post.side_effect = RuntimeError("connection refused")
|
||||
# Must not raise — OV API failure is non-fatal for the reset path
|
||||
provider.reset_session("no-server-session")
|
||||
assert provider._session_id == "no-server-session"
|
||||
|
||||
def test_reset_without_client_is_noop(self):
|
||||
"""reset_session() works even if provider was never initialized (no client)."""
|
||||
try:
|
||||
from plugins.memory.openviking import OpenVikingMemoryProvider
|
||||
except ImportError:
|
||||
pytest.skip("openviking plugin not importable")
|
||||
|
||||
provider = OpenVikingMemoryProvider()
|
||||
provider._client = None
|
||||
provider._session_id = "old"
|
||||
provider._turn_count = 3
|
||||
provider._sync_thread = None
|
||||
provider._prefetch_thread = None
|
||||
provider._prefetch_result = ""
|
||||
|
||||
provider.reset_session("new-no-client")
|
||||
assert provider._session_id == "new-no-client"
|
||||
assert provider._turn_count == 0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue