diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 410979a0ec..596080c32e 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -23,6 +23,7 @@ Capabilities: from __future__ import annotations +import atexit import json import logging import os @@ -37,6 +38,30 @@ _DEFAULT_ENDPOINT = "http://127.0.0.1:1933" _TIMEOUT = 30.0 +# --------------------------------------------------------------------------- +# Process-level atexit safety net — ensures pending sessions are committed +# even if shutdown_memory_provider is never called (e.g. gateway crash, +# SIGKILL, or exception in _async_flush_memories preventing shutdown). +# --------------------------------------------------------------------------- +_last_active_provider: Optional["OpenVikingMemoryProvider"] = None + + +def _atexit_commit_sessions(): + """Fire on_session_end for the last active provider on process exit.""" + global _last_active_provider + provider = _last_active_provider + if provider is None: + return + _last_active_provider = None + try: + provider.on_session_end([]) + except Exception: + pass # best-effort at shutdown time + + +atexit.register(_atexit_commit_sessions) + + # --------------------------------------------------------------------------- # HTTP helper — uses httpx to avoid requiring the openviking SDK # --------------------------------------------------------------------------- @@ -277,6 +302,10 @@ class OpenVikingMemoryProvider(MemoryProvider): logger.warning("httpx not installed — OpenViking plugin disabled") self._client = None + # Register as the last active provider for atexit safety net + global _last_active_provider + _last_active_provider = self + def system_prompt_block(self) -> str: if not self._client: return "" @@ -387,13 +416,18 @@ class OpenVikingMemoryProvider(MemoryProvider): OpenViking automatically extracts 6 categories of memories: profile, preferences, entities, events, cases, and patterns. """ - if not self._client or self._turn_count == 0: + if not self._client: return - # Wait for any pending sync to finish first + # Wait for any pending sync to finish first — do this before the + # turn_count check so the last turn's messages are flushed even if + # the count hasn't been incremented yet. if self._sync_thread and self._sync_thread.is_alive(): self._sync_thread.join(timeout=10.0) + if self._turn_count == 0: + return + try: self._client.post(f"/api/v1/sessions/{self._session_id}/commit") logger.info("OpenViking session %s committed (%d turns)", self._session_id, self._turn_count) @@ -449,6 +483,10 @@ class OpenVikingMemoryProvider(MemoryProvider): for t in (self._sync_thread, self._prefetch_thread): if t and t.is_alive(): t.join(timeout=5.0) + # Clear atexit reference so it doesn't double-commit + global _last_active_provider + if _last_active_provider is self: + _last_active_provider = None # -- Tool implementations ------------------------------------------------