fix(openviking): add atexit safety net for session commit

Ensures pending sessions are committed on process exit even if
shutdown_memory_provider is never called (gateway crash, SIGKILL,
or exception in _async_flush_memories preventing shutdown).

Also reorders on_session_end to wait for the pending sync thread
before checking turn_count, so the last turn's messages are flushed.

Based on PR #4919 by dagbs.
This commit is contained in:
dagbs 2026-04-06 16:45:13 -07:00 committed by Teknium
parent f071b1832a
commit e7698521e7

View file

@ -23,6 +23,7 @@ Capabilities:
from __future__ import annotations
import atexit
import json
import logging
import os
@ -37,6 +38,30 @@ _DEFAULT_ENDPOINT = "http://127.0.0.1:1933"
_TIMEOUT = 30.0
# ---------------------------------------------------------------------------
# Process-level atexit safety net — ensures pending sessions are committed
# even if shutdown_memory_provider is never called (e.g. gateway crash,
# SIGKILL, or exception in _async_flush_memories preventing shutdown).
# ---------------------------------------------------------------------------
_last_active_provider: Optional["OpenVikingMemoryProvider"] = None
def _atexit_commit_sessions():
"""Fire on_session_end for the last active provider on process exit."""
global _last_active_provider
provider = _last_active_provider
if provider is None:
return
_last_active_provider = None
try:
provider.on_session_end([])
except Exception:
pass # best-effort at shutdown time
atexit.register(_atexit_commit_sessions)
# ---------------------------------------------------------------------------
# HTTP helper — uses httpx to avoid requiring the openviking SDK
# ---------------------------------------------------------------------------
@ -277,6 +302,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
logger.warning("httpx not installed — OpenViking plugin disabled")
self._client = None
# Register as the last active provider for atexit safety net
global _last_active_provider
_last_active_provider = self
def system_prompt_block(self) -> str:
if not self._client:
return ""
@ -387,13 +416,18 @@ class OpenVikingMemoryProvider(MemoryProvider):
OpenViking automatically extracts 6 categories of memories:
profile, preferences, entities, events, cases, and patterns.
"""
if not self._client or self._turn_count == 0:
if not self._client:
return
# Wait for any pending sync to finish first
# Wait for any pending sync to finish first — do this before the
# turn_count check so the last turn's messages are flushed even if
# the count hasn't been incremented yet.
if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=10.0)
if self._turn_count == 0:
return
try:
self._client.post(f"/api/v1/sessions/{self._session_id}/commit")
logger.info("OpenViking session %s committed (%d turns)", self._session_id, self._turn_count)
@ -449,6 +483,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
for t in (self._sync_thread, self._prefetch_thread):
if t and t.is_alive():
t.join(timeout=5.0)
# Clear atexit reference so it doesn't double-commit
global _last_active_provider
if _last_active_provider is self:
_last_active_provider = None
# -- Tool implementations ------------------------------------------------