mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-19 10:02:16 +00:00
Follow-up cleanup on the OpenViking setup path merged in #48262: - _write_ovcli_config now uses utils.atomic_json_write(path, data, mode=0o600) instead of the local _precreate_secret_file + write_text + chmod sequence. The shared helper (already used by honcho/mem0/supermemory/hindsight) writes via temp-file + fchmod(0600) + fsync + os.replace, so the ovcli.conf is written atomically (no half-written secret file on crash) and with no chmod-after-write TOCTOU window. _precreate_secret_file stays for the .env writer path. - Remove dead _DEFAULT_ACCOUNT/_DEFAULT_USER constants (0 references; the empty->'default' tenant fallback lives in the _VikingClient constructor). Tests: tests/plugins/memory/test_openviking_provider.py + test_memory_setup.py + openviking_plugin/test_openviking.py -> 130 passed; ruff clean.
2770 lines
104 KiB
Python
2770 lines
104 KiB
Python
"""OpenViking memory plugin — full bidirectional MemoryProvider interface.
|
|
|
|
Context database by Volcengine (ByteDance) that organizes agent knowledge
|
|
into a filesystem hierarchy (viking:// URIs) with tiered context loading,
|
|
automatic memory extraction, and session management.
|
|
|
|
Original PR #3369 by Mibayy, rewritten to use the full OpenViking session
|
|
lifecycle instead of read-only search endpoints.
|
|
|
|
Config via environment variables (profile-scoped via each profile's .env)
|
|
or a linked OpenViking CLI config:
|
|
OPENVIKING_ENDPOINT — Server URL (default: http://127.0.0.1:1933)
|
|
OPENVIKING_API_KEY — API key (required for authenticated servers)
|
|
OPENVIKING_ACCOUNT — Optional tenant account override
|
|
OPENVIKING_USER — Optional tenant user override
|
|
OPENVIKING_AGENT — Tenant agent (default: hermes)
|
|
|
|
Capabilities:
|
|
- Automatic memory extraction on session commit (6 categories)
|
|
- Tiered context: L0 (~100 tokens), L1 (~2k), L2 (full)
|
|
- Semantic search with hierarchical directory retrieval
|
|
- Filesystem-style browsing via viking:// URIs
|
|
- Resource ingestion (URLs, docs, code)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import json
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import zipfile
|
|
from dataclasses import dataclass, replace
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
from urllib.parse import urlparse
|
|
from urllib.request import url2pathname
|
|
|
|
from agent.memory_provider import MemoryProvider
|
|
from agent.skill_commands import extract_user_instruction_from_skill_message
|
|
from tools.registry import tool_error
|
|
from utils import atomic_json_write
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DEFAULT_ENDPOINT = "http://127.0.0.1:1933"
|
|
_OPENVIKING_SERVICE_ENDPOINT = "https://api.vikingdb.cn-beijing.volces.com/openviking"
|
|
_DEFAULT_AGENT = "hermes"
|
|
_OVCLI_CONFIG_ENV = "OPENVIKING_CLI_CONFIG_FILE"
|
|
_OVCLI_DEFAULT_RELATIVE_PATH = ".openviking/ovcli.conf"
|
|
_OVCLI_SAVED_PREFIX = "ovcli.conf."
|
|
_OPENVIKING_ENV_KEYS = (
|
|
"OPENVIKING_ENDPOINT",
|
|
"OPENVIKING_API_KEY",
|
|
"OPENVIKING_ACCOUNT",
|
|
"OPENVIKING_USER",
|
|
"OPENVIKING_AGENT",
|
|
)
|
|
_TIMEOUT = 30.0
|
|
_SESSION_DRAIN_TIMEOUT = 10.0
|
|
_DEFERRED_COMMIT_TIMEOUT = (_TIMEOUT * 2) + 5.0
|
|
_REMOTE_RESOURCE_PREFIXES = ("http://", "https://", "git@", "ssh://", "git://")
|
|
|
|
# Maps the viking_remember `category` enum to a viking:// subdirectory.
|
|
# Keep in sync with REMEMBER_SCHEMA.parameters.properties.category.enum.
|
|
_CATEGORY_SUBDIR_MAP = {
|
|
"preference": "preferences",
|
|
"entity": "entities",
|
|
"event": "events",
|
|
"case": "cases",
|
|
"pattern": "patterns",
|
|
}
|
|
_DEFAULT_MEMORY_SUBDIR = "preferences"
|
|
|
|
# Maps the built-in memory tool's `target` ("user" vs "memory") to a subdir
|
|
# for on_memory_write mirroring. User profile facts → preferences; agent
|
|
# notes / observations → patterns. Anything unknown falls back to the default.
|
|
_MEMORY_WRITE_TARGET_SUBDIR_MAP = {
|
|
"user": "preferences",
|
|
"memory": "patterns",
|
|
}
|
|
_LOCAL_OPENVIKING_HOSTS = {"localhost", "127.0.0.1", "::1"}
|
|
_LOCAL_OPENVIKING_AUTOSTART_TIMEOUT = 60.0
|
|
_OPENVIKING_SERVER_LOG_RELATIVE_PATH = Path("logs") / "openviking-server.log"
|
|
_OPENVIKING_RESPONDED_FAILURE_PREFIX = "OpenViking server responded"
|
|
_SETUP_CANCELLED = object()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _OvcliProfile:
|
|
source: str
|
|
name: str
|
|
path: Path
|
|
data: dict
|
|
values: dict
|
|
is_active: bool = False
|
|
|
|
|
|
class _OpenVikingHTTPError(RuntimeError):
|
|
def __init__(self, message: str, status_code: Optional[int] = None):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
|
|
|
|
def _sanitize_openviking_error_message(message: str, status_code: Optional[int] = None) -> str:
|
|
text = (message or "").strip()
|
|
status = f"HTTP {status_code}" if status_code else "HTTP error"
|
|
looks_like_html = bool(re.search(r"^\s*<(!doctype|html|head|body)\b", text, flags=re.IGNORECASE))
|
|
if looks_like_html:
|
|
title_match = re.search(r"<title[^>]*>(.*?)</title>", text, flags=re.IGNORECASE | re.DOTALL)
|
|
if title_match:
|
|
title = re.sub(r"\s+", " ", title_match.group(1)).strip()
|
|
if "|" in title:
|
|
title = title.split("|", 1)[1].strip()
|
|
if status_code and title.startswith(f"{status_code}:"):
|
|
title = title.split(":", 1)[1].strip()
|
|
if title:
|
|
return f"{status}: {title}"
|
|
return f"{status}: OpenViking endpoint returned an HTML error page."
|
|
|
|
if len(text) > 300:
|
|
return text[:297].rstrip() + "..."
|
|
return text or status
|
|
|
|
|
|
def _format_openviking_exception(error: Exception) -> str:
|
|
status_code = None
|
|
if isinstance(error, _OpenVikingHTTPError):
|
|
status_code = error.status_code
|
|
else:
|
|
response = getattr(error, "response", None)
|
|
status_code = getattr(response, "status_code", None)
|
|
return _sanitize_openviking_error_message(str(error), status_code)
|
|
|
|
|
|
def _derive_openviking_user_text(content: Any) -> str:
|
|
"""Strip Hermes slash-skill scaffolding before sending content to OpenViking.
|
|
|
|
Defense-in-depth: MemoryManager already strips skill scaffolding for the
|
|
whole provider fan-out (see ``MemoryManager._strip_skill_scaffolding``), so
|
|
in normal operation this receives already-clean text and passes it through
|
|
unchanged. It stays here so OpenViking is correct if its hooks are ever
|
|
invoked outside the manager. Delegates to the canonical extractor in
|
|
``agent.skill_commands`` — no duplicated marker literals, no drift risk.
|
|
"""
|
|
return extract_user_instruction_from_skill_message(content) or ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Process-level atexit safety net — ensures pending sessions are committed
|
|
# even if shutdown_memory_provider is never called (e.g. gateway crash,
|
|
# SIGKILL, or exception in the session expiry watcher preventing shutdown).
|
|
# ---------------------------------------------------------------------------
|
|
_last_active_provider: Optional["OpenVikingMemoryProvider"] = None
|
|
|
|
|
|
def _atexit_commit_sessions():
|
|
"""Fire on_session_end for the last active provider on process exit."""
|
|
global _last_active_provider
|
|
provider = _last_active_provider
|
|
if provider is None:
|
|
return
|
|
_last_active_provider = None
|
|
try:
|
|
provider.on_session_end([])
|
|
except Exception:
|
|
pass # best-effort at shutdown time
|
|
|
|
|
|
atexit.register(_atexit_commit_sessions)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helper — uses httpx to avoid requiring the openviking SDK
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_httpx():
|
|
"""Lazy import httpx."""
|
|
try:
|
|
import httpx
|
|
return httpx
|
|
except ImportError:
|
|
return None
|
|
|
|
|
|
class _VikingClient:
|
|
"""Thin HTTP client for the OpenViking REST API."""
|
|
|
|
def __init__(self, endpoint: str, api_key: str = "",
|
|
account: Optional[str] = None, user: Optional[str] = None,
|
|
agent: Optional[str] = None):
|
|
self._endpoint = endpoint.rstrip("/")
|
|
self._api_key = api_key
|
|
# Empty account/user fall back to "default" and the tenant headers are
|
|
# always sent — ROOT API keys require them (preserves the merged
|
|
# contract from #22414/#21232; an empty string must NOT omit the
|
|
# header). Use `or` (not `is not None`) so "" also falls back.
|
|
self._account = account or os.environ.get("OPENVIKING_ACCOUNT", "default")
|
|
self._user = user or os.environ.get("OPENVIKING_USER", "default")
|
|
self._agent = agent if agent is not None else os.environ.get("OPENVIKING_AGENT", _DEFAULT_AGENT)
|
|
self._httpx = _get_httpx()
|
|
if self._httpx is None:
|
|
raise ImportError("httpx is required for OpenViking: pip install httpx")
|
|
|
|
def _headers(self) -> dict:
|
|
h = {"Content-Type": "application/json"}
|
|
if self._agent:
|
|
h["X-OpenViking-Actor-Peer"] = self._agent
|
|
h["X-OpenViking-Agent"] = self._agent
|
|
if self._account:
|
|
h["X-OpenViking-Account"] = self._account
|
|
if self._user:
|
|
h["X-OpenViking-User"] = self._user
|
|
if self._api_key:
|
|
h["X-API-Key"] = self._api_key
|
|
h["Authorization"] = "Bearer " + self._api_key
|
|
return h
|
|
|
|
def _url(self, path: str) -> str:
|
|
return f"{self._endpoint}{path}"
|
|
|
|
def _multipart_headers(self) -> dict:
|
|
headers = self._headers()
|
|
headers.pop("Content-Type", None)
|
|
return headers
|
|
|
|
def _parse_response(self, resp) -> dict:
|
|
try:
|
|
data = resp.json()
|
|
except Exception:
|
|
data = None
|
|
|
|
if resp.status_code >= 400:
|
|
message = _sanitize_openviking_error_message(
|
|
getattr(resp, "text", ""),
|
|
resp.status_code,
|
|
)
|
|
if isinstance(data, dict):
|
|
error = data.get("error")
|
|
if isinstance(error, dict):
|
|
code = error.get("code", "HTTP_ERROR")
|
|
message = f"{code}: {error.get('message', message)}"
|
|
raise _OpenVikingHTTPError(message, resp.status_code)
|
|
if data.get("status") == "error":
|
|
raise _OpenVikingHTTPError(str(data), resp.status_code)
|
|
raise _OpenVikingHTTPError(message or f"HTTP {resp.status_code}", resp.status_code)
|
|
|
|
if isinstance(data, dict) and data.get("status") == "error":
|
|
error = data.get("error")
|
|
if isinstance(error, dict):
|
|
code = error.get("code", "OPENVIKING_ERROR")
|
|
message = error.get("message", "")
|
|
raise RuntimeError(f"{code}: {message}")
|
|
raise RuntimeError(str(data))
|
|
|
|
if data is None:
|
|
return {}
|
|
return data
|
|
|
|
def get(self, path: str, **kwargs) -> dict:
|
|
resp = self._httpx.get(
|
|
self._url(path), headers=self._headers(), timeout=_TIMEOUT, **kwargs
|
|
)
|
|
return self._parse_response(resp)
|
|
|
|
def post(self, path: str, payload: dict = None, **kwargs) -> dict:
|
|
resp = self._httpx.post(
|
|
self._url(path), json=payload or {}, headers=self._headers(),
|
|
timeout=_TIMEOUT, **kwargs
|
|
)
|
|
return self._parse_response(resp)
|
|
|
|
def upload_temp_file(self, file_path: Path) -> str:
|
|
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
|
|
with file_path.open("rb") as f:
|
|
resp = self._httpx.post(
|
|
self._url("/api/v1/resources/temp_upload"),
|
|
files={"file": (file_path.name, f, mime_type)},
|
|
headers=self._multipart_headers(),
|
|
timeout=_TIMEOUT,
|
|
)
|
|
data = self._parse_response(resp)
|
|
result = data.get("result", {})
|
|
temp_file_id = result.get("temp_file_id", "")
|
|
if not temp_file_id:
|
|
raise RuntimeError("OpenViking temp upload did not return temp_file_id")
|
|
return temp_file_id
|
|
|
|
def health(self) -> bool:
|
|
try:
|
|
resp = self._httpx.get(
|
|
self._url("/health"), headers=self._headers(), timeout=3.0
|
|
)
|
|
return resp.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
def health_payload(self) -> dict:
|
|
resp = self._httpx.get(
|
|
self._url("/health"), headers=self._headers(), timeout=3.0
|
|
)
|
|
return self._parse_response(resp)
|
|
|
|
def validate_auth(self) -> dict:
|
|
"""Validate authenticated OpenViking access without mutating state."""
|
|
return self.get("/api/v1/system/status")
|
|
|
|
def validate_root_access(self) -> dict:
|
|
"""Validate ROOT access against a read-only admin endpoint."""
|
|
return self.get("/api/v1/admin/accounts")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SEARCH_SCHEMA = {
|
|
"name": "viking_search",
|
|
"description": (
|
|
"Semantic search over the OpenViking knowledge base. "
|
|
"Returns ranked results with viking:// URIs for deeper reading. "
|
|
"Use mode='deep' for complex queries that need reasoning across "
|
|
"multiple sources, 'fast' for simple lookups."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string", "description": "Search query."},
|
|
"mode": {
|
|
"type": "string", "enum": ["auto", "fast", "deep"],
|
|
"description": "Search depth (default: auto).",
|
|
},
|
|
"scope": {
|
|
"type": "string",
|
|
"description": "Viking URI prefix to scope search (e.g. 'viking://resources/docs/').",
|
|
},
|
|
"limit": {"type": "integer", "description": "Max results (default: 10)."},
|
|
},
|
|
"required": ["query"],
|
|
},
|
|
}
|
|
|
|
READ_SCHEMA = {
|
|
"name": "viking_read",
|
|
"description": (
|
|
"Read content at a viking:// URI. Three detail levels:\n"
|
|
" abstract — ~100 token summary (L0)\n"
|
|
" overview — ~2k token key points (L1)\n"
|
|
" full — complete content (L2)\n"
|
|
"Start with abstract/overview, only use full when you need details."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"uri": {"type": "string", "description": "viking:// URI to read."},
|
|
"level": {
|
|
"type": "string", "enum": ["abstract", "overview", "full"],
|
|
"description": "Detail level (default: overview).",
|
|
},
|
|
},
|
|
"required": ["uri"],
|
|
},
|
|
}
|
|
|
|
BROWSE_SCHEMA = {
|
|
"name": "viking_browse",
|
|
"description": (
|
|
"Browse the OpenViking knowledge store like a filesystem.\n"
|
|
" list — show directory contents\n"
|
|
" tree — show hierarchy\n"
|
|
" stat — show metadata for a URI"
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string", "enum": ["tree", "list", "stat"],
|
|
"description": "Browse action.",
|
|
},
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Viking URI path (default: viking://). Examples: 'viking://resources/', 'viking://user/memories/'.",
|
|
},
|
|
},
|
|
"required": ["action"],
|
|
},
|
|
}
|
|
|
|
REMEMBER_SCHEMA = {
|
|
"name": "viking_remember",
|
|
"description": (
|
|
"Explicitly store a fact or memory in the OpenViking knowledge base. "
|
|
"Use for important information the agent should remember long-term. "
|
|
"The system automatically categorizes and indexes the memory."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {"type": "string", "description": "The information to remember."},
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["preference", "entity", "event", "case", "pattern"],
|
|
"description": "Memory category (default: auto-detected).",
|
|
},
|
|
},
|
|
"required": ["content"],
|
|
},
|
|
}
|
|
|
|
ADD_RESOURCE_SCHEMA = {
|
|
"name": "viking_add_resource",
|
|
"description": (
|
|
"Add a remote URL or local file/directory to the OpenViking knowledge base. "
|
|
"Remote resources must be public http(s), git, or ssh URLs. "
|
|
"Local files are uploaded first using OpenViking temp_upload. "
|
|
"The system automatically parses, indexes, and generates summaries."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"url": {"type": "string", "description": "Remote URL or local file/directory path to add."},
|
|
"reason": {
|
|
"type": "string",
|
|
"description": "Why this resource is relevant (improves search).",
|
|
},
|
|
"to": {
|
|
"type": "string",
|
|
"description": "Optional target viking:// URI for the resource.",
|
|
},
|
|
"parent": {
|
|
"type": "string",
|
|
"description": "Optional parent viking:// URI. Cannot be used with to.",
|
|
},
|
|
"instruction": {
|
|
"type": "string",
|
|
"description": "Optional processing instruction for semantic extraction.",
|
|
},
|
|
"wait": {
|
|
"type": "boolean",
|
|
"description": "Whether to wait for processing to complete.",
|
|
},
|
|
"timeout": {
|
|
"type": "number",
|
|
"description": "Timeout in seconds when wait is true.",
|
|
},
|
|
},
|
|
"required": ["url"],
|
|
},
|
|
}
|
|
|
|
|
|
def _zip_directory(dir_path: Path) -> Path:
|
|
"""Create a temporary zip file containing a directory tree."""
|
|
root = dir_path.resolve()
|
|
zip_path = Path(tempfile.gettempdir()) / f"openviking_upload_{uuid.uuid4().hex}.zip"
|
|
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
|
for file_path in dir_path.rglob("*"):
|
|
if file_path.is_symlink():
|
|
continue
|
|
if file_path.is_file():
|
|
try:
|
|
file_path.resolve().relative_to(root)
|
|
except ValueError:
|
|
continue
|
|
arcname = str(file_path.relative_to(dir_path)).replace("\\", "/")
|
|
zipf.write(file_path, arcname=arcname)
|
|
return zip_path
|
|
|
|
|
|
def _is_windows_absolute_path(value: str) -> bool:
|
|
return (
|
|
len(value) >= 3
|
|
and value[0].isalpha()
|
|
and value[1] == ":"
|
|
and value[2] in {"/", "\\"}
|
|
)
|
|
|
|
|
|
def _is_remote_resource_source(value: str) -> bool:
|
|
return value.startswith(_REMOTE_RESOURCE_PREFIXES)
|
|
|
|
|
|
def _is_local_path_reference(value: str) -> bool:
|
|
if not value or "\n" in value or "\r" in value:
|
|
return False
|
|
if _is_remote_resource_source(value):
|
|
return False
|
|
if _is_windows_absolute_path(value):
|
|
return True
|
|
return (
|
|
value.startswith(("/", "./", "../", "~/", ".\\", "..\\", "~\\"))
|
|
or "/" in value
|
|
or "\\" in value
|
|
)
|
|
|
|
|
|
def _path_from_file_uri(uri: str) -> Path | str:
|
|
parsed = urlparse(uri)
|
|
if parsed.netloc not in {"", "localhost"}:
|
|
return f"Unsupported non-local file URI: {uri}"
|
|
return Path(url2pathname(parsed.path)).expanduser()
|
|
|
|
|
|
def _clean_config_value(value: Any) -> str:
|
|
return value.strip() if isinstance(value, str) else ""
|
|
|
|
|
|
def _default_ovcli_config_path() -> Path:
|
|
return Path.home() / _OVCLI_DEFAULT_RELATIVE_PATH
|
|
|
|
|
|
def _resolve_ovcli_config_path(config_path: str = "") -> Path:
|
|
env_path = os.environ.get(_OVCLI_CONFIG_ENV, "").strip()
|
|
if env_path:
|
|
return Path(env_path).expanduser()
|
|
if config_path:
|
|
return Path(config_path).expanduser()
|
|
return _default_ovcli_config_path()
|
|
|
|
|
|
def _ovcli_config_dir() -> Path:
|
|
return _default_ovcli_config_path().parent
|
|
|
|
|
|
def _load_ovcli_config(path: Optional[Path] = None) -> dict:
|
|
config_path = path or _resolve_ovcli_config_path()
|
|
if not config_path.exists():
|
|
return {}
|
|
with config_path.open(encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"OpenViking CLI config must be a JSON object: {config_path}")
|
|
return data
|
|
|
|
|
|
def _connection_values_from_ovcli(data: dict) -> dict:
|
|
api_key = _clean_config_value(data.get("api_key")) or _clean_config_value(data.get("root_api_key"))
|
|
root_api_key = _clean_config_value(data.get("root_api_key"))
|
|
send_identity = not api_key or api_key == root_api_key
|
|
account = _clean_config_value(data.get("account") or data.get("account_id"))
|
|
user = _clean_config_value(data.get("user") or data.get("user_id"))
|
|
return {
|
|
"endpoint": _normalize_openviking_url(data.get("url")),
|
|
"api_key": api_key,
|
|
"root_api_key": root_api_key,
|
|
"account": account if send_identity else "",
|
|
"user": user if send_identity else "",
|
|
"agent": _clean_config_value(data.get("actor_peer_id") or data.get("agent_id")),
|
|
}
|
|
|
|
|
|
def _is_valid_ovcli_profile_name(name: str) -> bool:
|
|
if not name or name.strip() != name or name.startswith("."):
|
|
return False
|
|
if "/" in name or "\\" in name:
|
|
return False
|
|
return all(ch.isascii() and (ch.isalnum() or ch in {"-", "_"}) for ch in name)
|
|
|
|
|
|
def _validate_openviking_identity_value(value: str, *, field: str) -> tuple[bool, str, str]:
|
|
label = "Account ID" if field == "account" else "User ID"
|
|
identifier = "account_id" if field == "account" else "user_id"
|
|
trimmed = value.strip()
|
|
if not trimmed:
|
|
return False, f"{label} cannot be empty.", ""
|
|
if trimmed != value:
|
|
return False, f"{label} cannot start or end with whitespace.", ""
|
|
if field == "account" and trimmed.startswith("_"):
|
|
return False, "Account ID cannot start with '_'.", ""
|
|
if not all(ch.isascii() and (ch.isalnum() or ch in {"_", "-", ".", "@"}) for ch in trimmed):
|
|
return False, f"{label} can only contain letters, numbers, '_', '-', '.', and '@'.", ""
|
|
if trimmed.count("@") > 1:
|
|
return False, f"{identifier} must have at most one '@'.", ""
|
|
return True, "", trimmed
|
|
|
|
|
|
def _normalize_openviking_url(url: str) -> str:
|
|
trimmed = _clean_config_value(url).rstrip("/")
|
|
if not trimmed:
|
|
return _DEFAULT_ENDPOINT
|
|
lower = trimmed.lower()
|
|
if lower in {"::1", "[::1]"}:
|
|
return "http://[::1]:1933"
|
|
if lower.startswith("[::1]:"):
|
|
return f"http://[::1]:{trimmed.rsplit(':', 1)[1]}"
|
|
if lower.startswith("::1:"):
|
|
return f"http://[::1]:{trimmed.rsplit(':', 1)[1]}"
|
|
if "://" in trimmed:
|
|
return trimmed
|
|
host, _sep, port = trimmed.partition(":")
|
|
if host.lower() in {"localhost", "127.0.0.1"}:
|
|
return f"http://{host}:{port or '1933'}"
|
|
return trimmed
|
|
|
|
|
|
def _load_profile(path: Path, *, source: str, name: str) -> Optional[_OvcliProfile]:
|
|
try:
|
|
data = _load_ovcli_config(path)
|
|
except Exception as e:
|
|
logger.debug("Skipping invalid OpenViking CLI config %s: %s", path, e)
|
|
return None
|
|
return _OvcliProfile(
|
|
source=source,
|
|
name=name,
|
|
path=path,
|
|
data=data,
|
|
values=_connection_values_from_ovcli(data),
|
|
)
|
|
|
|
|
|
def _profile_identity(path: Path) -> str:
|
|
try:
|
|
return str(path.expanduser().resolve())
|
|
except OSError:
|
|
return str(path.expanduser())
|
|
|
|
|
|
def _profiles_equivalent(left: _OvcliProfile, right: _OvcliProfile) -> bool:
|
|
return left.values == right.values
|
|
|
|
|
|
def _discover_ovcli_profiles() -> list[_OvcliProfile]:
|
|
profiles: list[_OvcliProfile] = []
|
|
seen_paths: set[str] = set()
|
|
|
|
def add(path: Path, *, source: str, name: str) -> None:
|
|
if not path.exists() or not path.is_file():
|
|
return
|
|
identity = _profile_identity(path)
|
|
if identity in seen_paths:
|
|
return
|
|
profile = _load_profile(path, source=source, name=name)
|
|
if profile is None:
|
|
return
|
|
seen_paths.add(identity)
|
|
profiles.append(profile)
|
|
|
|
env_path = os.environ.get(_OVCLI_CONFIG_ENV, "").strip()
|
|
if env_path:
|
|
add(Path(env_path).expanduser(), source="env", name=_OVCLI_CONFIG_ENV)
|
|
|
|
active_path = _default_ovcli_config_path()
|
|
active_profile = _load_profile(active_path, source="active", name="active") if active_path.exists() else None
|
|
|
|
config_dir = _ovcli_config_dir()
|
|
saved_start = len(profiles)
|
|
if config_dir.exists():
|
|
for path in sorted(config_dir.iterdir(), key=lambda item: item.name):
|
|
if not path.is_file():
|
|
continue
|
|
name = path.name.removeprefix(_OVCLI_SAVED_PREFIX)
|
|
if name == path.name or name == "bak" or not _is_valid_ovcli_profile_name(name):
|
|
continue
|
|
add(path, source="saved", name=name)
|
|
|
|
if active_profile is not None:
|
|
marked_active = False
|
|
for idx in range(saved_start, len(profiles)):
|
|
if profiles[idx].source == "saved" and _profiles_equivalent(profiles[idx], active_profile):
|
|
profiles[idx] = replace(profiles[idx], is_active=True)
|
|
marked_active = True
|
|
break
|
|
has_env_profile = any(profile.source == "env" for profile in profiles)
|
|
has_saved_profile = any(profile.source == "saved" for profile in profiles)
|
|
active_identity = _profile_identity(active_profile.path)
|
|
if not marked_active and not has_env_profile and not has_saved_profile and active_identity not in seen_paths:
|
|
profiles.append(active_profile)
|
|
|
|
return profiles
|
|
|
|
|
|
def _is_local_openviking_url(value: str) -> bool:
|
|
candidate = _normalize_openviking_url(value)
|
|
if not candidate:
|
|
return False
|
|
if "://" not in candidate:
|
|
candidate = f"//{candidate}"
|
|
parsed = urlparse(candidate)
|
|
scheme = (parsed.scheme or "http").lower()
|
|
return scheme == "http" and (parsed.hostname or "").lower() in _LOCAL_OPENVIKING_HOSTS
|
|
|
|
|
|
def _load_hermes_openviking_config() -> dict:
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
|
|
config = load_config()
|
|
memory_config = config.get("memory", {}) if isinstance(config, dict) else {}
|
|
provider_config = memory_config.get("openviking", {}) if isinstance(memory_config, dict) else {}
|
|
return dict(provider_config) if isinstance(provider_config, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _env_value(name: str) -> Optional[str]:
|
|
return os.environ[name].strip() if name in os.environ else None
|
|
|
|
|
|
def _first_nonempty(*values: Optional[str], default: str = "") -> str:
|
|
for value in values:
|
|
if value:
|
|
return value
|
|
return default
|
|
|
|
|
|
def _resolve_connection_settings(provider_config: Optional[dict] = None) -> dict:
|
|
provider_config = dict(provider_config or {})
|
|
ovcli_values: dict = {}
|
|
if provider_config.get("use_ovcli_config"):
|
|
ovcli_path = _resolve_ovcli_config_path(str(provider_config.get("ovcli_config_path") or ""))
|
|
ovcli_values = _connection_values_from_ovcli(_load_ovcli_config(ovcli_path))
|
|
|
|
endpoint_env = _env_value("OPENVIKING_ENDPOINT")
|
|
api_key_env = _env_value("OPENVIKING_API_KEY")
|
|
account_env = _env_value("OPENVIKING_ACCOUNT")
|
|
user_env = _env_value("OPENVIKING_USER")
|
|
agent_env = _env_value("OPENVIKING_AGENT")
|
|
|
|
return {
|
|
"endpoint": _first_nonempty(endpoint_env, ovcli_values.get("endpoint"), default=_DEFAULT_ENDPOINT),
|
|
"api_key": api_key_env if api_key_env is not None else ovcli_values.get("api_key", ""),
|
|
"account": account_env if account_env is not None else ovcli_values.get("account", ""),
|
|
"user": user_env if user_env is not None else ovcli_values.get("user", ""),
|
|
"agent": _first_nonempty(agent_env, ovcli_values.get("agent"), default=_DEFAULT_AGENT),
|
|
}
|
|
|
|
|
|
def _env_writes_from_connection_values(values: dict) -> dict:
|
|
writes = {}
|
|
mapping = {
|
|
"OPENVIKING_ENDPOINT": "endpoint",
|
|
"OPENVIKING_API_KEY": "api_key",
|
|
"OPENVIKING_ACCOUNT": "account",
|
|
"OPENVIKING_USER": "user",
|
|
"OPENVIKING_AGENT": "agent",
|
|
}
|
|
for env_key, value_key in mapping.items():
|
|
value = _clean_config_value(values.get(value_key))
|
|
if value:
|
|
writes[env_key] = value
|
|
return writes
|
|
|
|
|
|
def _restrict_secret_file_permissions(path: Path) -> None:
|
|
try:
|
|
path.chmod(stat.S_IRUSR | stat.S_IWUSR)
|
|
except OSError as e:
|
|
logger.debug("Could not restrict permissions on %s: %s", path, e)
|
|
|
|
|
|
def _precreate_secret_file(path: Path) -> None:
|
|
"""Create (or tighten) a secret-bearing file with 0600 BEFORE writing.
|
|
|
|
Writing the file first and chmod-ing afterwards leaves a window where a
|
|
freshly-created file is world-readable under the default umask (e.g. 0644),
|
|
briefly exposing the api_key/root_api_key. Pre-creating with 0600 closes
|
|
that window; an existing file is tightened to 0600 here too.
|
|
"""
|
|
try:
|
|
if not path.exists():
|
|
os.close(os.open(str(path), os.O_CREAT | os.O_WRONLY, 0o600))
|
|
_restrict_secret_file_permissions(path)
|
|
except OSError as e:
|
|
logger.debug("Could not pre-create secret file %s: %s", path, e)
|
|
|
|
|
|
def _write_env_vars(env_path: Path, env_writes: dict, remove_keys: tuple[str, ...] = ()) -> None:
|
|
env_path.parent.mkdir(parents=True, exist_ok=True)
|
|
remove_set = set(remove_keys) - set(env_writes)
|
|
existing_lines = env_path.read_text(encoding="utf-8").splitlines() if env_path.exists() else []
|
|
updated_keys = set()
|
|
new_lines = []
|
|
for line in existing_lines:
|
|
key_match = line.split("=", 1)[0].strip() if "=" in line else ""
|
|
if key_match in remove_set:
|
|
continue
|
|
if key_match in env_writes:
|
|
new_lines.append(f"{key_match}={env_writes[key_match]}")
|
|
updated_keys.add(key_match)
|
|
else:
|
|
new_lines.append(line)
|
|
for key, val in env_writes.items():
|
|
if key not in updated_keys:
|
|
new_lines.append(f"{key}={val}")
|
|
# Pre-create with 0600 so secrets are never briefly world-readable.
|
|
_precreate_secret_file(env_path)
|
|
env_path.write_text("\n".join(new_lines) + ("\n" if new_lines else ""), encoding="utf-8")
|
|
_restrict_secret_file_permissions(env_path)
|
|
|
|
|
|
def _remember_ovcli_path(provider_config: dict, ovcli_path: Path) -> None:
|
|
default_path = _default_ovcli_config_path().expanduser()
|
|
if os.environ.get(_OVCLI_CONFIG_ENV, "").strip() or ovcli_path.expanduser() != default_path:
|
|
provider_config["ovcli_config_path"] = str(ovcli_path)
|
|
else:
|
|
provider_config.pop("ovcli_config_path", None)
|
|
|
|
|
|
def _ovcli_data_from_connection_values(values: dict) -> dict:
|
|
data = {"url": _normalize_openviking_url(_clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT)}
|
|
api_key = _clean_config_value(values.get("api_key"))
|
|
root_api_key = _clean_config_value(values.get("root_api_key"))
|
|
account = _clean_config_value(values.get("account"))
|
|
user = _clean_config_value(values.get("user"))
|
|
agent = _clean_config_value(values.get("agent")) or _DEFAULT_AGENT
|
|
if api_key:
|
|
data["api_key"] = api_key
|
|
if root_api_key:
|
|
data["root_api_key"] = root_api_key
|
|
if account:
|
|
data["account"] = account
|
|
if user:
|
|
data["user"] = user
|
|
if agent:
|
|
data["actor_peer_id"] = agent
|
|
return data
|
|
|
|
|
|
def _write_ovcli_config(path: Path, values: dict) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
# atomic_json_write creates the temp file with mode 0o600 and os.replace()s
|
|
# it into place — no half-written config on crash and no chmod-after-write
|
|
# TOCTOU window for the api_key/root_api_key it carries.
|
|
atomic_json_write(path, _ovcli_data_from_connection_values(values), mode=0o600)
|
|
|
|
|
|
def _validate_openviking_reachability(endpoint: str) -> tuple[bool, str]:
|
|
endpoint = _normalize_openviking_url(endpoint)
|
|
try:
|
|
client = _VikingClient(endpoint)
|
|
if hasattr(client, "health_payload"):
|
|
payload = client.health_payload()
|
|
if payload.get("healthy") is False:
|
|
return False, "OpenViking server responded but reported unhealthy status."
|
|
if payload:
|
|
return True, ""
|
|
elif client.health():
|
|
return True, ""
|
|
except Exception as e:
|
|
if _status_code_from_error(e) is not None:
|
|
return False, f"OpenViking server responded with {_format_openviking_exception(e)}."
|
|
return False, f"OpenViking server is not reachable at {endpoint}: {_format_openviking_exception(e)}"
|
|
return False, f"OpenViking server is not reachable at {endpoint}."
|
|
|
|
|
|
def _validate_openviking_auth(values: dict) -> tuple[bool, str]:
|
|
endpoint = _normalize_openviking_url(values.get("endpoint"))
|
|
try:
|
|
client = _VikingClient(
|
|
endpoint,
|
|
_clean_config_value(values.get("api_key")),
|
|
account=_clean_config_value(values.get("account")),
|
|
user=_clean_config_value(values.get("user")),
|
|
agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT,
|
|
)
|
|
client.validate_auth()
|
|
except Exception as e:
|
|
return False, f"OpenViking authentication validation failed: {_format_openviking_exception(e)}"
|
|
return True, ""
|
|
|
|
|
|
def _validate_openviking_root_access(values: dict) -> tuple[bool, str]:
|
|
endpoint = _normalize_openviking_url(values.get("endpoint"))
|
|
try:
|
|
client = _VikingClient(
|
|
endpoint,
|
|
_clean_config_value(values.get("api_key")),
|
|
agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT,
|
|
)
|
|
client.validate_root_access()
|
|
except Exception as e:
|
|
return False, f"OpenViking root API key validation failed: {_format_openviking_exception(e)}"
|
|
return True, ""
|
|
|
|
|
|
def _validate_openviking_user_key_scope(values: dict) -> tuple[bool, str]:
|
|
root_ok, _message = _validate_openviking_root_access(values)
|
|
if not root_ok:
|
|
return True, ""
|
|
return (
|
|
False,
|
|
"That key has ROOT access. Choose Root API key and provide account/user, "
|
|
"or enter a user API key.",
|
|
)
|
|
|
|
|
|
def _status_code_from_error(error: Exception) -> Optional[int]:
|
|
if isinstance(error, _OpenVikingHTTPError):
|
|
return error.status_code
|
|
response = getattr(error, "response", None)
|
|
return getattr(response, "status_code", None)
|
|
|
|
|
|
def _admin_probe_means_regular_key(error: Exception) -> bool:
|
|
return _status_code_from_error(error) in {401, 403, 404}
|
|
|
|
|
|
def _should_probe_openviking_auth(health: dict, *, require_api_key: bool, has_api_key: bool) -> bool:
|
|
if require_api_key or has_api_key:
|
|
return True
|
|
auth_mode = health.get("auth_mode")
|
|
if auth_mode == "dev":
|
|
return False
|
|
if auth_mode in {"api_key", "trusted", None}:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _validate_openviking_setup_values(
|
|
values: dict,
|
|
*,
|
|
require_api_key: bool = False,
|
|
) -> tuple[bool, str, Optional[str]]:
|
|
endpoint = _normalize_openviking_url(values.get("endpoint"))
|
|
api_key = _clean_config_value(values.get("api_key"))
|
|
if require_api_key and not api_key:
|
|
return False, "Remote OpenViking configs require an API key.", None
|
|
|
|
try:
|
|
client = _VikingClient(
|
|
endpoint,
|
|
api_key,
|
|
account=_clean_config_value(values.get("account")),
|
|
user=_clean_config_value(values.get("user")),
|
|
agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT,
|
|
)
|
|
health = client.health_payload()
|
|
if health.get("healthy") is False:
|
|
return False, "OpenViking server responded but reported unhealthy status.", None
|
|
if _should_probe_openviking_auth(
|
|
health,
|
|
require_api_key=require_api_key,
|
|
has_api_key=bool(api_key),
|
|
):
|
|
client.validate_auth()
|
|
if not api_key:
|
|
return True, "", None
|
|
try:
|
|
client.validate_root_access()
|
|
return True, "", "root"
|
|
except Exception as e:
|
|
if _admin_probe_means_regular_key(e):
|
|
return True, "", "user"
|
|
raise
|
|
except Exception as e:
|
|
return False, f"OpenViking validation failed: {_format_openviking_exception(e)}", None
|
|
|
|
|
|
def _retry_or_cancel_manual_setup(select, title: str, message: str, cancelled):
|
|
print(f" {message}")
|
|
choice = select(
|
|
title,
|
|
[
|
|
("Retry", "try this step again"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if choice == 0:
|
|
return True
|
|
return _SETUP_CANCELLED
|
|
|
|
|
|
def _print_validation_progress(message: str) -> None:
|
|
print(f" {message}", flush=True)
|
|
|
|
|
|
def _local_openviking_bind(endpoint: str) -> tuple[str, int]:
|
|
normalized = _normalize_openviking_url(endpoint)
|
|
parsed = urlparse(normalized)
|
|
host = parsed.hostname or "127.0.0.1"
|
|
port = parsed.port or 1933
|
|
return host, port
|
|
|
|
|
|
def _openviking_server_log_path() -> Path:
|
|
try:
|
|
from hermes_constants import get_hermes_home
|
|
home = get_hermes_home()
|
|
except Exception:
|
|
home = Path(os.environ.get("HERMES_HOME", "")).expanduser() if os.environ.get("HERMES_HOME") else Path.home() / ".hermes"
|
|
return home / _OPENVIKING_SERVER_LOG_RELATIVE_PATH
|
|
|
|
|
|
def _start_local_openviking_server(endpoint: str) -> tuple[bool, str]:
|
|
server_cmd = shutil.which("openviking-server")
|
|
if not server_cmd:
|
|
return False, "openviking-server was not found on PATH. Start it manually, then retry."
|
|
try:
|
|
host, port = _local_openviking_bind(endpoint)
|
|
except ValueError as e:
|
|
return False, f"Could not parse local OpenViking URL: {e}"
|
|
log_path = _openviking_server_log_path()
|
|
try:
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("ab") as log_file:
|
|
subprocess.Popen(
|
|
[server_cmd, "--host", host, "--port", str(port)],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
stdin=subprocess.DEVNULL,
|
|
start_new_session=True,
|
|
)
|
|
except Exception as e:
|
|
return False, f"Could not start openviking-server: {e}"
|
|
return True, f"Started openviking-server on {host}:{port} in the background. Logs: {log_path}"
|
|
|
|
|
|
def _wait_for_openviking_health(endpoint: str, *, timeout_seconds: float = 15.0) -> bool:
|
|
deadline = time.monotonic() + timeout_seconds
|
|
while time.monotonic() < deadline:
|
|
ok, _message = _validate_openviking_reachability(endpoint)
|
|
if ok:
|
|
return True
|
|
time.sleep(0.5)
|
|
return False
|
|
|
|
|
|
def _reachability_failure_allows_local_autostart(message: str) -> bool:
|
|
return not (message or "").startswith(_OPENVIKING_RESPONDED_FAILURE_PREFIX)
|
|
|
|
|
|
def _handle_unreachable_endpoint(
|
|
endpoint: str,
|
|
message: str,
|
|
select,
|
|
cancelled,
|
|
*,
|
|
allow_local_autostart: bool = True,
|
|
):
|
|
if _is_local_openviking_url(endpoint) and allow_local_autostart:
|
|
print(f" {message}")
|
|
choice = select(
|
|
" Local OpenViking server is down",
|
|
[
|
|
("Start local OpenViking", "run openviking-server and retry"),
|
|
("Retry URL", "enter the server URL again"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if choice == 0:
|
|
started, start_message = _start_local_openviking_server(endpoint)
|
|
print(f" {start_message}")
|
|
if not started:
|
|
return False
|
|
print(" Waiting for OpenViking server to become reachable...", flush=True)
|
|
if _wait_for_openviking_health(
|
|
endpoint,
|
|
timeout_seconds=_LOCAL_OPENVIKING_AUTOSTART_TIMEOUT,
|
|
):
|
|
print(" OpenViking server is reachable.")
|
|
return True
|
|
print(" OpenViking server did not become reachable.")
|
|
return False
|
|
if choice == 1:
|
|
return False
|
|
return _SETUP_CANCELLED
|
|
|
|
return _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking server unhealthy" if _is_local_openviking_url(endpoint) else " OpenViking server unreachable",
|
|
message,
|
|
cancelled,
|
|
)
|
|
|
|
|
|
def _emit_runtime_warning(message: str, warning_callback=None) -> None:
|
|
logger.warning("%s", message)
|
|
if warning_callback:
|
|
try:
|
|
warning_callback(message)
|
|
except Exception:
|
|
logger.debug("OpenViking runtime warning callback failed", exc_info=True)
|
|
|
|
|
|
def _emit_runtime_status(message: str, status_callback=None) -> None:
|
|
logger.info("%s", message)
|
|
if status_callback:
|
|
try:
|
|
status_callback(message)
|
|
except Exception:
|
|
logger.debug("OpenViking runtime status callback failed", exc_info=True)
|
|
|
|
|
|
def _runtime_openviking_timeout_message(endpoint: str) -> str:
|
|
return (
|
|
f"Local OpenViking server at {endpoint} is not reachable. "
|
|
"Tried to start openviking-server, but it did not become reachable "
|
|
f"within {_LOCAL_OPENVIKING_AUTOSTART_TIMEOUT:.0f} seconds. "
|
|
"OpenViking memory disabled for this Hermes run."
|
|
)
|
|
|
|
|
|
def _classify_runtime_openviking_health(client: _VikingClient, endpoint: str) -> tuple[str, str]:
|
|
"""Classify runtime health without treating every false result as server absence."""
|
|
try:
|
|
if hasattr(client, "health_payload"):
|
|
payload = client.health_payload()
|
|
if payload.get("healthy") is False:
|
|
return (
|
|
"responded",
|
|
f"OpenViking server at {endpoint} responded but reported unhealthy status.",
|
|
)
|
|
return "healthy", ""
|
|
if client.health():
|
|
return "healthy", ""
|
|
except _OpenVikingHTTPError as e:
|
|
return (
|
|
"responded",
|
|
f"OpenViking server at {endpoint} responded with {_format_openviking_exception(e)}.",
|
|
)
|
|
except Exception:
|
|
return "unreachable", ""
|
|
return "unreachable", ""
|
|
|
|
|
|
def _prompt_profile_name(prompt, select, cancelled) -> str | object:
|
|
while True:
|
|
name = _clean_config_value(prompt("OpenViking profile name"))
|
|
if _is_valid_ovcli_profile_name(name):
|
|
return name
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" Invalid OpenViking profile name",
|
|
"Profile names can only contain letters, numbers, '-' and '_'.",
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
|
|
|
|
def _confirm_replace_existing_profile(path: Path, values: dict, select, cancelled):
|
|
if not path.exists():
|
|
return True
|
|
try:
|
|
existing_data = _load_ovcli_config(path)
|
|
except Exception:
|
|
existing_data = {}
|
|
if existing_data == _ovcli_data_from_connection_values(values):
|
|
return True
|
|
choice = select(
|
|
" OpenViking profile already exists",
|
|
[
|
|
("Choose another name", "leave the existing profile unchanged"),
|
|
("Replace profile", "overwrite this saved OpenViking profile"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if choice == 1:
|
|
return True
|
|
if choice == 0:
|
|
return False
|
|
return _SETUP_CANCELLED
|
|
|
|
|
|
def _prompt_manual_connection_values(prompt, select, cancelled, *, service: bool = False):
|
|
if service:
|
|
endpoint = _OPENVIKING_SERVICE_ENDPOINT
|
|
print(f" OpenViking Service endpoint: {endpoint}")
|
|
else:
|
|
while True:
|
|
endpoint = _normalize_openviking_url(prompt("OpenViking server URL", default=_DEFAULT_ENDPOINT))
|
|
_print_validation_progress("Checking OpenViking server...")
|
|
reachable, message = _validate_openviking_reachability(endpoint)
|
|
if reachable:
|
|
print(" OpenViking server is reachable.")
|
|
break
|
|
retry = _handle_unreachable_endpoint(
|
|
endpoint,
|
|
message,
|
|
select,
|
|
cancelled,
|
|
allow_local_autostart=_reachability_failure_allows_local_autostart(message),
|
|
)
|
|
if retry is True:
|
|
break
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
|
|
is_local = _is_local_openviking_url(endpoint)
|
|
api_key_type = "user" if service else ""
|
|
prefilled_api_key = ""
|
|
prefilled_agent = ""
|
|
while True:
|
|
values = {
|
|
"endpoint": endpoint,
|
|
"api_key": "",
|
|
"root_api_key": "",
|
|
"account": "",
|
|
"user": "",
|
|
"agent": "",
|
|
}
|
|
if not api_key_type and is_local:
|
|
credential_choice = select(
|
|
" OpenViking credential",
|
|
[
|
|
("No API key", "local dev mode"),
|
|
("User API key", "server derives account/user automatically"),
|
|
("Root API key", "requires account and user IDs"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if credential_choice == cancelled:
|
|
return _SETUP_CANCELLED
|
|
if credential_choice == 0:
|
|
values["agent"] = _clean_config_value(
|
|
prompt("OpenViking agent", default=_DEFAULT_AGENT)
|
|
) or _DEFAULT_AGENT
|
|
_print_validation_progress("Validating OpenViking local dev access...")
|
|
valid, message, _role = _validate_openviking_setup_values(values)
|
|
if valid:
|
|
print(" OpenViking local dev access validated.")
|
|
return values
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking credential failed",
|
|
message,
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
continue
|
|
api_key_type = "root" if credential_choice == 2 else "user"
|
|
elif not api_key_type:
|
|
credential_choice = select(
|
|
" OpenViking API key type",
|
|
[
|
|
("User API key", "server derives account/user automatically"),
|
|
("Root API key", "requires account and user IDs"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if credential_choice == cancelled:
|
|
return _SETUP_CANCELLED
|
|
api_key_type = "root" if credential_choice == 1 else "user"
|
|
|
|
values["api_key_type"] = api_key_type
|
|
if service:
|
|
api_key_label = "OpenViking API key"
|
|
else:
|
|
api_key_label = (
|
|
"OpenViking root API key"
|
|
if api_key_type == "root"
|
|
else "OpenViking user API key"
|
|
)
|
|
if prefilled_api_key:
|
|
values["api_key"] = prefilled_api_key
|
|
prefilled_api_key = ""
|
|
else:
|
|
values["api_key"] = _clean_config_value(prompt(api_key_label, secret=True))
|
|
if not values["api_key"]:
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking API key required",
|
|
f"{api_key_label} is required.",
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
continue
|
|
|
|
if api_key_type == "root":
|
|
_print_validation_progress("Validating OpenViking root API key...")
|
|
valid, message, role = _validate_openviking_setup_values(values, require_api_key=True)
|
|
root_ok = valid and role == "root"
|
|
if not root_ok:
|
|
if valid and role == "user":
|
|
print(" That key is valid, but it is a user API key.")
|
|
route_choice = select(
|
|
" OpenViking key is a user key",
|
|
[
|
|
("Use as User API key", "server derives account/user automatically"),
|
|
("Re-enter Root API key", "try another root key"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if route_choice == 0:
|
|
prefilled_api_key = values["api_key"]
|
|
api_key_type = "user"
|
|
continue
|
|
if route_choice == 1:
|
|
api_key_type = "root"
|
|
continue
|
|
return _SETUP_CANCELLED
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking root API key failed",
|
|
message,
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
continue
|
|
print(" OpenViking root API key validated.")
|
|
values["root_api_key"] = values["api_key"]
|
|
account_ok, account_message, account = _validate_openviking_identity_value(
|
|
prompt("OpenViking account"),
|
|
field="account",
|
|
)
|
|
user_ok, user_message, user = _validate_openviking_identity_value(
|
|
prompt("OpenViking user"),
|
|
field="user",
|
|
)
|
|
values["account"] = account
|
|
values["user"] = user
|
|
if not account_ok or not user_ok:
|
|
message = account_message if not account_ok else user_message
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking tenant identity required",
|
|
message,
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
prefilled_api_key = values["api_key"]
|
|
continue
|
|
|
|
if prefilled_agent:
|
|
values["agent"] = prefilled_agent
|
|
prefilled_agent = ""
|
|
else:
|
|
values["agent"] = _clean_config_value(
|
|
prompt("OpenViking agent", default=_DEFAULT_AGENT)
|
|
) or _DEFAULT_AGENT
|
|
_print_validation_progress("Validating OpenViking API access...")
|
|
valid, message, role = _validate_openviking_setup_values(
|
|
values,
|
|
require_api_key=service or not is_local,
|
|
)
|
|
if valid:
|
|
if api_key_type == "user":
|
|
if role == "root":
|
|
print(" That key is valid, but it has root access.")
|
|
route_choice = select(
|
|
" OpenViking user API key is root key",
|
|
[
|
|
("Configure as Root API key", "provide account and user IDs"),
|
|
("Re-enter User API key", "try another user key"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if route_choice == 0:
|
|
prefilled_api_key = values["api_key"]
|
|
prefilled_agent = values["agent"]
|
|
api_key_type = "root"
|
|
continue
|
|
if route_choice == 1:
|
|
api_key_type = "user"
|
|
continue
|
|
return _SETUP_CANCELLED
|
|
if api_key_type == "root" and role != "root":
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking root API key failed",
|
|
"The supplied key was not accepted as a root API key.",
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
continue
|
|
print(" OpenViking API access validated.")
|
|
return values
|
|
retry = _retry_or_cancel_manual_setup(
|
|
select,
|
|
" OpenViking API access failed",
|
|
message,
|
|
cancelled,
|
|
)
|
|
if retry is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
|
|
|
|
def _set_openviking_provider(config: dict, provider_config: dict) -> None:
|
|
config["memory"]["provider"] = "openviking"
|
|
config["memory"]["openviking"] = provider_config
|
|
|
|
|
|
def _link_ovcli_profile(
|
|
*,
|
|
config: dict,
|
|
provider_config: dict,
|
|
env_path: Path,
|
|
ovcli_path: Path,
|
|
) -> None:
|
|
for key in ("endpoint", "api_key", "root_api_key", "account", "user", "agent", "api_key_type"):
|
|
provider_config.pop(key, None)
|
|
provider_config["use_ovcli_config"] = True
|
|
_remember_ovcli_path(provider_config, ovcli_path)
|
|
_set_openviking_provider(config, provider_config)
|
|
_write_env_vars(env_path, {}, remove_keys=_OPENVIKING_ENV_KEYS)
|
|
for key in _OPENVIKING_ENV_KEYS:
|
|
os.environ.pop(key, None)
|
|
|
|
|
|
def _save_hermes_only_config(
|
|
*,
|
|
config: dict,
|
|
provider_config: dict,
|
|
env_path: Path,
|
|
values: dict,
|
|
) -> None:
|
|
provider_config["use_ovcli_config"] = False
|
|
provider_config.pop("ovcli_config_path", None)
|
|
_set_openviking_provider(config, provider_config)
|
|
_write_env_vars(
|
|
env_path,
|
|
_env_writes_from_connection_values(values),
|
|
remove_keys=_OPENVIKING_ENV_KEYS,
|
|
)
|
|
|
|
|
|
def _profile_display_name(profile: _OvcliProfile) -> str:
|
|
if profile.source == "env":
|
|
return _OVCLI_CONFIG_ENV
|
|
if profile.source == "active":
|
|
return "ovcli.conf"
|
|
return profile.name
|
|
|
|
|
|
def _profile_description(profile: _OvcliProfile) -> str:
|
|
endpoint = _clean_config_value(profile.values.get("endpoint")) or _DEFAULT_ENDPOINT
|
|
return f"{endpoint} ({profile.path})"
|
|
|
|
|
|
def _validate_profile_for_setup(profile: _OvcliProfile) -> tuple[bool, str, Optional[str]]:
|
|
require_api_key = not _is_local_openviking_url(profile.values.get("endpoint", ""))
|
|
return _validate_openviking_setup_values(profile.values, require_api_key=require_api_key)
|
|
|
|
|
|
def _print_openviking_ready(message: str, path: Optional[Path] = None) -> None:
|
|
print("\n OpenViking memory is ready")
|
|
print(f" {message}")
|
|
if path is not None:
|
|
print(f" Config file: {path}")
|
|
print(" Start a new Hermes session to activate.\n")
|
|
|
|
|
|
def _run_existing_profile_setup(
|
|
*,
|
|
profiles: list[_OvcliProfile],
|
|
select,
|
|
cancelled,
|
|
config: dict,
|
|
provider_config: dict,
|
|
env_path: Path,
|
|
) -> bool | object:
|
|
while True:
|
|
choice = select(
|
|
" OpenViking profile",
|
|
[(_profile_display_name(profile), _profile_description(profile)) for profile in profiles],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if choice == cancelled:
|
|
return _SETUP_CANCELLED
|
|
if choice < 0 or choice >= len(profiles):
|
|
return _SETUP_CANCELLED
|
|
|
|
profile = profiles[choice]
|
|
_print_validation_progress("Validating OpenViking profile...")
|
|
ok, message, _role = _validate_profile_for_setup(profile)
|
|
if ok:
|
|
_link_ovcli_profile(
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
ovcli_path=profile.path,
|
|
)
|
|
_print_openviking_ready(f"Linked profile: {_profile_display_name(profile)}", profile.path)
|
|
return True
|
|
|
|
print(f" {message}")
|
|
retry = select(
|
|
" OpenViking profile validation failed",
|
|
[
|
|
("Choose another profile", "select a different OpenViking profile"),
|
|
("Retry validation", "try this profile again"),
|
|
("Cancel setup", "no changes saved"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if retry == 0:
|
|
continue
|
|
if retry == 1:
|
|
_print_validation_progress("Validating OpenViking profile...")
|
|
ok, message, _role = _validate_profile_for_setup(profile)
|
|
if ok:
|
|
_link_ovcli_profile(
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
ovcli_path=profile.path,
|
|
)
|
|
_print_openviking_ready(f"Linked profile: {_profile_display_name(profile)}", profile.path)
|
|
return True
|
|
print(f" {message}")
|
|
continue
|
|
return _SETUP_CANCELLED
|
|
|
|
|
|
def _mirror_manual_config_to_openviking_store(
|
|
*,
|
|
prompt,
|
|
select,
|
|
cancelled,
|
|
values: dict,
|
|
) -> Path | object:
|
|
while True:
|
|
name = _prompt_profile_name(prompt, select, cancelled)
|
|
if name is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
path = _ovcli_config_dir() / f"{_OVCLI_SAVED_PREFIX}{name}"
|
|
replace = _confirm_replace_existing_profile(path, values, select, cancelled)
|
|
if replace is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
if replace is False:
|
|
continue
|
|
_write_ovcli_config(path, values)
|
|
return path
|
|
|
|
|
|
def _run_create_profile_setup(
|
|
*,
|
|
prompt,
|
|
select,
|
|
cancelled,
|
|
config: dict,
|
|
provider_config: dict,
|
|
env_path: Path,
|
|
) -> bool | object:
|
|
source_choice = select(
|
|
" OpenViking connection",
|
|
[
|
|
("OpenViking Service (VolcEngine Cloud)", "use the managed OpenViking endpoint"),
|
|
("Custom", "use a local, VPS, or self-hosted OpenViking server"),
|
|
],
|
|
default=0,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if source_choice == cancelled:
|
|
return _SETUP_CANCELLED
|
|
|
|
values = _prompt_manual_connection_values(prompt, select, cancelled, service=(source_choice == 0))
|
|
if values is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
if values is None:
|
|
return False
|
|
|
|
save_choice = select(
|
|
" Save OpenViking config",
|
|
[
|
|
("Keep in Hermes only", "write values only to Hermes .env"),
|
|
("Mirror to OpenViking store", "write ~/.openviking/ovcli.conf.<name> and link it"),
|
|
],
|
|
default=1,
|
|
cancel_returns=cancelled,
|
|
)
|
|
if save_choice == cancelled:
|
|
return _SETUP_CANCELLED
|
|
|
|
if save_choice == 1:
|
|
ovcli_path = _mirror_manual_config_to_openviking_store(
|
|
prompt=prompt,
|
|
select=select,
|
|
cancelled=cancelled,
|
|
values=values,
|
|
)
|
|
if ovcli_path is _SETUP_CANCELLED:
|
|
return _SETUP_CANCELLED
|
|
_link_ovcli_profile(
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
ovcli_path=ovcli_path,
|
|
)
|
|
_print_openviking_ready("Created and linked OpenViking profile.", ovcli_path)
|
|
return True
|
|
|
|
_save_hermes_only_config(
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
values=values,
|
|
)
|
|
_print_openviking_ready("Connection saved to Hermes .env.")
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MemoryProvider implementation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OpenVikingMemoryProvider(MemoryProvider):
|
|
"""Full bidirectional memory via OpenViking context database."""
|
|
|
|
def __init__(self):
|
|
self._client: Optional[_VikingClient] = None
|
|
self._endpoint = ""
|
|
self._api_key = ""
|
|
self._session_id = ""
|
|
self._turn_count = 0
|
|
# Guards the (_session_id, _turn_count) pair. sync_turn runs on the
|
|
# MemoryManager's background sync executor while on_session_end /
|
|
# on_session_switch run on the caller's thread, so the snapshot+reset
|
|
# of the turn counter and the session-id rotation must be atomic
|
|
# against a concurrent increment. See hermes-agent#28296 review.
|
|
self._session_state_lock = threading.Lock()
|
|
# Commit only after session writes drain. The set is keyed by the sid
|
|
# the writer is POSTing under (snapshotted at spawn), so on_session_end
|
|
# / on_session_switch see every still-alive writer for that sid even
|
|
# if later writes have replaced the latest-tracked thread.
|
|
self._inflight_writers: Dict[str, Set[threading.Thread]] = {}
|
|
self._inflight_lock = threading.Lock()
|
|
self._deferred_commit_sids: Set[str] = set()
|
|
self._deferred_commit_threads: Set[threading.Thread] = set()
|
|
self._deferred_commit_lock = threading.Lock()
|
|
self._committed_session_ids: Set[str] = set()
|
|
self._committed_session_lock = threading.Lock()
|
|
self._prefetch_result = ""
|
|
self._prefetch_lock = threading.Lock()
|
|
self._prefetch_thread: Optional[threading.Thread] = None
|
|
self._runtime_start_lock = threading.Lock()
|
|
self._runtime_start_thread: Optional[threading.Thread] = None
|
|
# All prefetch threads ever spawned (daemon, short-lived). Tracked so
|
|
# shutdown() can drain them and rapid re-queues don't orphan a still-
|
|
# running thread by overwriting the single _prefetch_thread slot.
|
|
self._prefetch_threads: Set[threading.Thread] = set()
|
|
# Set on shutdown so deferred-commit / writer finalizers stop issuing
|
|
# network writes against a torn-down provider.
|
|
self._shutting_down = False
|
|
# Drop prefetch results from older switch generations.
|
|
self._prefetch_generation = 0
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "openviking"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if OpenViking endpoint is configured. No network calls."""
|
|
if os.environ.get("OPENVIKING_ENDPOINT"):
|
|
return True
|
|
provider_config = _load_hermes_openviking_config()
|
|
if not provider_config.get("use_ovcli_config"):
|
|
return False
|
|
try:
|
|
ovcli_path = _resolve_ovcli_config_path(str(provider_config.get("ovcli_config_path") or ""))
|
|
return bool(_connection_values_from_ovcli(_load_ovcli_config(ovcli_path)).get("endpoint"))
|
|
except Exception:
|
|
return False
|
|
|
|
def get_config_schema(self):
|
|
return [
|
|
{
|
|
"key": "endpoint",
|
|
"description": "OpenViking server URL",
|
|
"required": True,
|
|
"default": _DEFAULT_ENDPOINT,
|
|
"env_var": "OPENVIKING_ENDPOINT",
|
|
},
|
|
{
|
|
"key": "api_key",
|
|
"description": "OpenViking API key (leave blank for local dev mode)",
|
|
"secret": True,
|
|
"env_var": "OPENVIKING_API_KEY",
|
|
},
|
|
{
|
|
"key": "account",
|
|
"description": "OpenViking tenant account ID (blank for user API keys)",
|
|
"env_var": "OPENVIKING_ACCOUNT",
|
|
},
|
|
{
|
|
"key": "user",
|
|
"description": "OpenViking user ID within the account (blank for user API keys)",
|
|
"env_var": "OPENVIKING_USER",
|
|
},
|
|
{
|
|
"key": "agent",
|
|
"description": "OpenViking agent ID within the account ([hermes], useful in multi-agent mode)",
|
|
"default": "hermes",
|
|
"env_var": "OPENVIKING_AGENT",
|
|
},
|
|
]
|
|
|
|
def get_status_config(self, provider_config: dict) -> dict:
|
|
provider_config = dict(provider_config or {})
|
|
if provider_config.get("use_ovcli_config"):
|
|
ovcli_path = _resolve_ovcli_config_path(str(provider_config.get("ovcli_config_path") or ""))
|
|
try:
|
|
settings = _resolve_connection_settings(provider_config)
|
|
except Exception as e:
|
|
return {
|
|
"use_ovcli_config": True,
|
|
"ovcli_config_path": str(ovcli_path),
|
|
"error": _format_openviking_exception(e),
|
|
}
|
|
|
|
display = {
|
|
"use_ovcli_config": True,
|
|
"ovcli_config_path": str(ovcli_path),
|
|
"endpoint": settings.get("endpoint") or _DEFAULT_ENDPOINT,
|
|
"agent": settings.get("agent") or _DEFAULT_AGENT,
|
|
}
|
|
if settings.get("account"):
|
|
display["account"] = settings["account"]
|
|
if settings.get("user"):
|
|
display["user"] = settings["user"]
|
|
env_overrides = [key for key in _OPENVIKING_ENV_KEYS if _env_value(key) is not None]
|
|
if env_overrides:
|
|
display["env_overrides"] = ", ".join(env_overrides)
|
|
return display
|
|
|
|
display = dict(provider_config)
|
|
for key in ("api_key", "root_api_key"):
|
|
if key in display:
|
|
display[key] = "(set)"
|
|
return display
|
|
|
|
def post_setup(self, hermes_home: str, config: dict) -> None:
|
|
"""Custom setup that can reuse OpenViking's shared CLI config."""
|
|
from hermes_cli.config import save_config
|
|
from hermes_cli.memory_setup import _CANCELLED, _curses_select, _print_cancelled_setup, _prompt
|
|
|
|
hermes_home_path = Path(hermes_home)
|
|
env_path = hermes_home_path / ".env"
|
|
if not isinstance(config.get("memory"), dict):
|
|
config["memory"] = {}
|
|
provider_config = config["memory"].get("openviking", {})
|
|
if not isinstance(provider_config, dict):
|
|
provider_config = {}
|
|
|
|
print("\n OpenViking memory setup\n")
|
|
|
|
profiles = _discover_ovcli_profiles()
|
|
if profiles:
|
|
setup_options = [
|
|
("Use existing OpenViking profile", "choose from detected ovcli.conf profiles"),
|
|
("Create new OpenViking profile", "enter a new URL/API key"),
|
|
]
|
|
choice = _curses_select(
|
|
" OpenViking config source",
|
|
setup_options,
|
|
default=0,
|
|
cancel_returns=_CANCELLED,
|
|
)
|
|
if choice == _CANCELLED:
|
|
_print_cancelled_setup()
|
|
return
|
|
|
|
if choice == 0:
|
|
result = _run_existing_profile_setup(
|
|
profiles=profiles,
|
|
select=_curses_select,
|
|
cancelled=_CANCELLED,
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
)
|
|
if result is _SETUP_CANCELLED:
|
|
_print_cancelled_setup()
|
|
return
|
|
if result:
|
|
save_config(config)
|
|
return
|
|
|
|
else:
|
|
print(" No existing OpenViking CLI profiles found. Creating a new config.")
|
|
|
|
result = _run_create_profile_setup(
|
|
prompt=_prompt,
|
|
select=_curses_select,
|
|
cancelled=_CANCELLED,
|
|
config=config,
|
|
provider_config=provider_config,
|
|
env_path=env_path,
|
|
)
|
|
if result is _SETUP_CANCELLED:
|
|
_print_cancelled_setup()
|
|
return
|
|
if result:
|
|
save_config(config)
|
|
|
|
def _start_runtime_openviking_waiter(
|
|
self,
|
|
*,
|
|
status_callback=None,
|
|
warning_callback=None,
|
|
) -> None:
|
|
with self._runtime_start_lock:
|
|
if self._runtime_start_thread and self._runtime_start_thread.is_alive():
|
|
return
|
|
self._runtime_start_thread = threading.Thread(
|
|
target=self._finish_runtime_openviking_start,
|
|
kwargs={
|
|
"status_callback": status_callback,
|
|
"warning_callback": warning_callback,
|
|
},
|
|
daemon=True,
|
|
name="openviking-runtime-start",
|
|
)
|
|
self._runtime_start_thread.start()
|
|
|
|
def _finish_runtime_openviking_start(
|
|
self,
|
|
*,
|
|
status_callback=None,
|
|
warning_callback=None,
|
|
) -> None:
|
|
endpoint = self._endpoint
|
|
if not _wait_for_openviking_health(
|
|
endpoint,
|
|
timeout_seconds=_LOCAL_OPENVIKING_AUTOSTART_TIMEOUT,
|
|
):
|
|
_emit_runtime_warning(
|
|
_runtime_openviking_timeout_message(endpoint),
|
|
warning_callback,
|
|
)
|
|
return
|
|
|
|
try:
|
|
client = _VikingClient(
|
|
endpoint,
|
|
self._api_key,
|
|
account=self._account,
|
|
user=self._user,
|
|
agent=self._agent,
|
|
)
|
|
if not client.health():
|
|
_emit_runtime_warning(
|
|
f"OpenViking server at {endpoint} is still not reachable after auto-start; "
|
|
"OpenViking memory disabled for this Hermes run.",
|
|
warning_callback,
|
|
)
|
|
return
|
|
except ImportError:
|
|
logger.warning("httpx not installed — OpenViking plugin disabled")
|
|
return
|
|
except Exception as e:
|
|
_emit_runtime_warning(
|
|
f"OpenViking server at {endpoint} could not be attached after auto-start: {e}. "
|
|
"OpenViking memory disabled for this Hermes run.",
|
|
warning_callback,
|
|
)
|
|
return
|
|
|
|
self._client = client
|
|
_emit_runtime_status(
|
|
f"Local OpenViking server at {endpoint} is reachable; OpenViking memory is active for later turns.",
|
|
status_callback,
|
|
)
|
|
|
|
def _handle_runtime_openviking_unreachable(
|
|
self,
|
|
*,
|
|
status_callback=None,
|
|
warning_callback=None,
|
|
) -> None:
|
|
endpoint = self._endpoint
|
|
if not _is_local_openviking_url(endpoint):
|
|
_emit_runtime_warning(
|
|
f"Remote OpenViking server at {endpoint} is not reachable; "
|
|
"OpenViking memory disabled for this Hermes run. "
|
|
"Check the configured endpoint and network connectivity.",
|
|
warning_callback,
|
|
)
|
|
self._client = None
|
|
return
|
|
|
|
started, start_message = _start_local_openviking_server(endpoint)
|
|
if not started:
|
|
_emit_runtime_warning(
|
|
f"Local OpenViking server at {endpoint} is not reachable. {start_message} "
|
|
"OpenViking memory disabled for this Hermes run.",
|
|
warning_callback,
|
|
)
|
|
self._client = None
|
|
return
|
|
|
|
self._client = None
|
|
_emit_runtime_status(
|
|
f"{start_message} OpenViking memory is starting in the background and will attach when ready.",
|
|
status_callback,
|
|
)
|
|
self._start_runtime_openviking_waiter(
|
|
status_callback=status_callback,
|
|
warning_callback=warning_callback,
|
|
)
|
|
|
|
def initialize(self, session_id: str, **kwargs) -> None:
|
|
settings = _resolve_connection_settings(_load_hermes_openviking_config())
|
|
self._endpoint = settings["endpoint"]
|
|
self._api_key = settings["api_key"]
|
|
self._account = settings["account"]
|
|
self._user = settings["user"]
|
|
self._agent = settings["agent"]
|
|
self._session_id = session_id
|
|
self._turn_count = 0
|
|
warning_callback = (
|
|
kwargs.get("warning_callback")
|
|
if kwargs.get("platform") == "cli"
|
|
else None
|
|
)
|
|
status_callback = (
|
|
kwargs.get("status_callback")
|
|
if kwargs.get("platform") == "cli"
|
|
else None
|
|
)
|
|
|
|
try:
|
|
self._client = _VikingClient(
|
|
self._endpoint, self._api_key,
|
|
account=self._account, user=self._user, agent=self._agent,
|
|
)
|
|
health_state, health_message = _classify_runtime_openviking_health(self._client, self._endpoint)
|
|
if health_state == "unreachable":
|
|
self._handle_runtime_openviking_unreachable(
|
|
status_callback=status_callback,
|
|
warning_callback=warning_callback,
|
|
)
|
|
elif health_state != "healthy":
|
|
_emit_runtime_warning(
|
|
f"{health_message} OpenViking memory disabled for this Hermes run.",
|
|
warning_callback,
|
|
)
|
|
self._client = None
|
|
except ImportError:
|
|
logger.warning("httpx not installed — OpenViking plugin disabled")
|
|
self._client = None
|
|
|
|
# Register as the last active provider for atexit safety net
|
|
global _last_active_provider
|
|
_last_active_provider = self
|
|
|
|
def system_prompt_block(self) -> str:
|
|
if not self._client:
|
|
return ""
|
|
# Provide brief info about the knowledge base
|
|
try:
|
|
# Check what's in the knowledge base via a root listing
|
|
resp = self._client.get("/api/v1/fs/ls", params={"uri": "viking://"})
|
|
result = resp.get("result", [])
|
|
children = len(result) if isinstance(result, list) else 0
|
|
if children == 0:
|
|
return ""
|
|
return (
|
|
"# OpenViking Knowledge Base\n"
|
|
f"Active. Endpoint: {self._endpoint}\n"
|
|
"Use viking_search to find information, viking_read for details "
|
|
"(abstract/overview/full), viking_browse to explore.\n"
|
|
"Use viking_remember to store facts, viking_add_resource to index URLs/docs."
|
|
)
|
|
except Exception as e:
|
|
logger.warning("OpenViking system_prompt_block failed: %s", e)
|
|
return (
|
|
"# OpenViking Knowledge Base\n"
|
|
f"Active. Endpoint: {self._endpoint}\n"
|
|
"Use viking_search, viking_read, viking_browse, "
|
|
"viking_remember, viking_add_resource."
|
|
)
|
|
|
|
def prefetch(self, query: str, *, session_id: str = "") -> str:
|
|
"""Return prefetched results from the background thread."""
|
|
if self._prefetch_thread and self._prefetch_thread.is_alive():
|
|
self._prefetch_thread.join(timeout=3.0)
|
|
with self._prefetch_lock:
|
|
result = self._prefetch_result
|
|
self._prefetch_result = ""
|
|
if not result:
|
|
return ""
|
|
return f"## OpenViking Context\n{result}"
|
|
|
|
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
|
|
"""Fire a background search to pre-load relevant context."""
|
|
query = _derive_openviking_user_text(query)
|
|
if not self._client or not query:
|
|
return
|
|
|
|
# Drop prefetch results from older switch generations.
|
|
with self._prefetch_lock:
|
|
gen = self._prefetch_generation
|
|
|
|
holder: List[threading.Thread] = []
|
|
|
|
def _run():
|
|
try:
|
|
client = _VikingClient(
|
|
self._endpoint, self._api_key,
|
|
account=self._account, user=self._user, agent=self._agent,
|
|
)
|
|
resp = client.post("/api/v1/search/find", {
|
|
"query": query,
|
|
"limit": 5,
|
|
})
|
|
result = resp.get("result", {})
|
|
parts = []
|
|
for ctx_type in ("memories", "resources"):
|
|
items = result.get(ctx_type, [])
|
|
for item in items[:3]:
|
|
uri = item.get("uri", "")
|
|
abstract = item.get("abstract", "")
|
|
score = item.get("score", 0)
|
|
if abstract:
|
|
parts.append(f"- [{score:.2f}] {abstract} ({uri})")
|
|
if parts:
|
|
with self._prefetch_lock:
|
|
if gen != self._prefetch_generation:
|
|
return
|
|
self._prefetch_result = "\n".join(parts)
|
|
except Exception as e:
|
|
logger.debug("OpenViking prefetch failed: %s", e)
|
|
finally:
|
|
with self._prefetch_lock:
|
|
if holder:
|
|
self._prefetch_threads.discard(holder[0])
|
|
|
|
thread = threading.Thread(
|
|
target=_run, daemon=True, name="openviking-prefetch"
|
|
)
|
|
holder.append(thread)
|
|
with self._prefetch_lock:
|
|
self._prefetch_thread = thread
|
|
self._prefetch_threads.add(thread)
|
|
thread.start()
|
|
|
|
def _spawn_writer(self, sid: str, target: Callable[[], None], name: str) -> None:
|
|
"""Spawn a daemon writer tracked in _inflight_writers[sid].
|
|
|
|
Tracking is keyed by sid (not by a single latest-thread slot) so that
|
|
on_session_end / on_session_switch can drain every still-alive writer
|
|
for the session being committed.
|
|
"""
|
|
holder: List[threading.Thread] = []
|
|
|
|
def _wrapped():
|
|
try:
|
|
target()
|
|
finally:
|
|
with self._inflight_lock:
|
|
workers = self._inflight_writers.get(sid)
|
|
if workers is not None:
|
|
workers.discard(holder[0])
|
|
if not workers:
|
|
self._inflight_writers.pop(sid, None)
|
|
|
|
thread = threading.Thread(target=_wrapped, daemon=True, name=name)
|
|
holder.append(thread)
|
|
with self._inflight_lock:
|
|
self._inflight_writers.setdefault(sid, set()).add(thread)
|
|
thread.start()
|
|
|
|
def _drain_finalizers(self, timeout: float) -> bool:
|
|
"""Join every in-flight async session finalizer within a timeout.
|
|
|
|
The switch-path commit runs on a daemon finalizer thread so it never
|
|
blocks the caller's command thread; this lets shutdown and tests wait
|
|
for those commits deterministically. Returns True if all drained.
|
|
"""
|
|
deadline = time.monotonic() + timeout
|
|
while True:
|
|
with self._deferred_commit_lock:
|
|
workers = [t for t in self._deferred_commit_threads if t.is_alive()]
|
|
if not workers:
|
|
return True
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
return False
|
|
for t in workers:
|
|
slice_left = deadline - time.monotonic()
|
|
if slice_left <= 0:
|
|
break
|
|
# Floor the per-join wait so a thread whose join() returns
|
|
# instantly while still reporting alive can't hot-spin this loop.
|
|
t.join(timeout=min(slice_left, 0.05))
|
|
|
|
def _drain_writers(self, sid: str, timeout: float) -> bool:
|
|
"""Join every in-flight writer for sid within a shared timeout budget.
|
|
|
|
Returns True if all writers drained, False if any are still alive when
|
|
the budget runs out. Callers use the False return to skip the commit.
|
|
"""
|
|
if not sid:
|
|
return True
|
|
deadline = time.monotonic() + timeout
|
|
while True:
|
|
with self._inflight_lock:
|
|
workers = [t for t in self._inflight_writers.get(sid, ()) if t.is_alive()]
|
|
if not workers:
|
|
return True
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
return False
|
|
for t in workers:
|
|
slice_left = deadline - time.monotonic()
|
|
if slice_left <= 0:
|
|
break
|
|
t.join(timeout=slice_left)
|
|
|
|
def _new_client(self) -> _VikingClient:
|
|
return _VikingClient(
|
|
self._endpoint,
|
|
self._api_key,
|
|
account=self._account,
|
|
user=self._user,
|
|
agent=self._agent,
|
|
)
|
|
|
|
@staticmethod
|
|
def _text_part(content: str) -> Dict[str, str]:
|
|
return {"type": "text", "text": content}
|
|
|
|
@classmethod
|
|
def _turn_batch_payload(cls, user_content: str, assistant_content: str) -> Dict[str, Any]:
|
|
return {
|
|
"messages": [
|
|
{"role": "user", "parts": [cls._text_part(user_content)]},
|
|
{"role": "assistant", "parts": [cls._text_part(assistant_content)]},
|
|
]
|
|
}
|
|
|
|
@classmethod
|
|
def _post_session_turn(
|
|
cls,
|
|
client: _VikingClient,
|
|
sid: str,
|
|
user_content: str,
|
|
assistant_content: str,
|
|
) -> None:
|
|
client.post(
|
|
f"/api/v1/sessions/{sid}/messages/batch",
|
|
cls._turn_batch_payload(user_content, assistant_content),
|
|
)
|
|
|
|
def _session_has_pending_tokens(self, sid: str) -> bool:
|
|
try:
|
|
response = self._client.get(f"/api/v1/sessions/{sid}")
|
|
except Exception:
|
|
return False
|
|
session = self._unwrap_result(response)
|
|
if not isinstance(session, dict):
|
|
return False
|
|
try:
|
|
return int(session.get("pending_tokens") or 0) > 0
|
|
except (TypeError, ValueError):
|
|
return False
|
|
|
|
def _has_committed_session(self, sid: str) -> bool:
|
|
with self._committed_session_lock:
|
|
return sid in self._committed_session_ids
|
|
|
|
def _mark_session_committed(self, sid: str) -> None:
|
|
with self._committed_session_lock:
|
|
self._committed_session_ids.add(sid)
|
|
|
|
def _session_needs_commit(self, sid: str, turn_count: int) -> bool:
|
|
# Already-committed sessions never need a second commit, regardless of
|
|
# the turn counter — a racing sync_turn can re-increment _turn_count
|
|
# after a commit+reset, so the committed-guard must win over turn_count.
|
|
if self._has_committed_session(sid):
|
|
return False
|
|
if turn_count > 0:
|
|
return True
|
|
return self._session_has_pending_tokens(sid)
|
|
|
|
def _commit_session(self, sid: str, turn_count: int, *, context: str) -> bool:
|
|
try:
|
|
self._client.post(f"/api/v1/sessions/{sid}/commit")
|
|
self._mark_session_committed(sid)
|
|
logger.info("OpenViking session %s committed %s (%d turns)", sid, context, turn_count)
|
|
return True
|
|
except Exception as e:
|
|
logger.warning("OpenViking session commit failed for %s: %s", sid, e)
|
|
return False
|
|
|
|
def _finalize_session_async(self, sid: str, turn_count: int, *, context: str) -> None:
|
|
"""Drain the old session's writers and commit it on a daemon thread.
|
|
|
|
Used by on_session_switch (and the deferred-commit fallback) so the
|
|
potentially-multi-second drain + pending-token GET + commit POST never
|
|
runs on the caller's command thread. Deduped by sid so a rapid second
|
|
switch can't stack two finalizers for the same session, and a no-op
|
|
once shutdown has begun so we don't POST against a torn-down client.
|
|
"""
|
|
if not sid:
|
|
return
|
|
with self._deferred_commit_lock:
|
|
if self._shutting_down or sid in self._deferred_commit_sids:
|
|
return
|
|
self._deferred_commit_sids.add(sid)
|
|
|
|
holder: List[threading.Thread] = []
|
|
|
|
def _finalize() -> None:
|
|
try:
|
|
if self._shutting_down:
|
|
return
|
|
if not self._drain_writers(sid, timeout=_DEFERRED_COMMIT_TIMEOUT):
|
|
logger.warning(
|
|
"OpenViking writer for %s still alive after drain — "
|
|
"leaving session uncommitted",
|
|
sid,
|
|
)
|
|
return
|
|
if self._shutting_down:
|
|
return
|
|
if self._session_needs_commit(sid, turn_count):
|
|
self._commit_session(sid, turn_count, context=context)
|
|
finally:
|
|
with self._deferred_commit_lock:
|
|
self._deferred_commit_sids.discard(sid)
|
|
if holder:
|
|
self._deferred_commit_threads.discard(holder[0])
|
|
|
|
thread = threading.Thread(
|
|
target=_finalize,
|
|
daemon=True,
|
|
name=f"openviking-finalize-{sid}",
|
|
)
|
|
holder.append(thread)
|
|
with self._deferred_commit_lock:
|
|
self._deferred_commit_threads.add(thread)
|
|
thread.start()
|
|
|
|
def _invalidate_prefetch_state(self) -> None:
|
|
# Bump the generation under the same lock used by prefetch workers so
|
|
# late results from an older session are discarded deterministically.
|
|
with self._prefetch_lock:
|
|
self._prefetch_generation += 1
|
|
self._prefetch_result = ""
|
|
# Join EVERY tracked prefetch thread, not just the latest slot — a
|
|
# rapid re-queue can leave an older thread for the abandoned session
|
|
# still running (consistent with shutdown()).
|
|
workers = [t for t in self._prefetch_threads if t.is_alive()]
|
|
for t in workers:
|
|
t.join(timeout=3.0)
|
|
with self._prefetch_lock:
|
|
self._prefetch_result = ""
|
|
|
|
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
|
|
"""Record the conversation turn in OpenViking's session (non-blocking)."""
|
|
if not self._client:
|
|
return
|
|
|
|
user_content = _derive_openviking_user_text(user_content)
|
|
if not user_content:
|
|
return
|
|
|
|
# Snapshot the sid and bump the turn counter atomically so a
|
|
# concurrent on_session_switch/on_session_end can't interleave its
|
|
# snapshot+reset between the read and the increment (lost turn) and so
|
|
# the turn is unambiguously attributed to the session it targets.
|
|
with self._session_state_lock:
|
|
sid = str(session_id or self._session_id).strip()
|
|
if not sid:
|
|
return
|
|
self._turn_count += 1
|
|
|
|
def _sync():
|
|
try:
|
|
client = self._new_client()
|
|
self._post_session_turn(
|
|
client,
|
|
sid,
|
|
user_content[:4000],
|
|
assistant_content[:4000],
|
|
)
|
|
except Exception as e:
|
|
logger.debug("OpenViking sync_turn failed, reconnecting: %s", e)
|
|
try:
|
|
client = self._new_client()
|
|
self._post_session_turn(
|
|
client,
|
|
sid,
|
|
user_content[:4000],
|
|
assistant_content[:4000],
|
|
)
|
|
except Exception as retry_error:
|
|
logger.warning("OpenViking sync_turn failed: %s", retry_error)
|
|
|
|
self._spawn_writer(sid, _sync, name="openviking-sync")
|
|
|
|
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
|
|
"""Commit the session to trigger memory extraction.
|
|
|
|
OpenViking automatically extracts 6 categories of memories:
|
|
profile, preferences, entities, events, cases, and patterns.
|
|
"""
|
|
if not self._client:
|
|
return
|
|
|
|
# Snapshot sid + turn count atomically against a concurrent sync_turn
|
|
# increment. on_session_end runs at teardown so the drain+commit stays
|
|
# synchronous here (we want it to land before the process exits), but
|
|
# the counter read must still be consistent.
|
|
with self._session_state_lock:
|
|
sid = self._session_id
|
|
turn_count = self._turn_count
|
|
|
|
# Commit only after session writes drain.
|
|
if not self._drain_writers(sid, timeout=_SESSION_DRAIN_TIMEOUT):
|
|
logger.warning(
|
|
"OpenViking writer for %s still alive after drain — skipping commit",
|
|
sid,
|
|
)
|
|
return
|
|
|
|
if not self._session_needs_commit(sid, turn_count):
|
|
return
|
|
|
|
if self._commit_session(sid, turn_count, context="on session end"):
|
|
# Mark clean so a follow-up on_session_switch skips its own commit.
|
|
with self._session_state_lock:
|
|
if self._session_id == sid:
|
|
self._turn_count = 0
|
|
|
|
def on_session_switch(
|
|
self,
|
|
new_session_id: str,
|
|
*,
|
|
parent_session_id: str = "",
|
|
reset: bool = False,
|
|
**kwargs,
|
|
) -> None:
|
|
"""Commit the old session and rotate cached state to the new session_id.
|
|
|
|
Fires on /resume, /branch, /reset, /new, and context compression.
|
|
Without this hook, ``_session_id`` stays stuck at the value
|
|
``initialize()`` cached, so subsequent ``sync_turn()`` writes land in
|
|
the already-closed old session and ``on_session_end()`` tries to
|
|
commit it a second time. The new session never accumulates messages,
|
|
and memory extraction never fires for it. See hermes-agent#28296.
|
|
|
|
Flushes any in-flight sync under the old session_id, commits the old
|
|
session if it has pending turns (same extraction semantics as
|
|
``on_session_end``), drains and clears any stale prefetch result,
|
|
then rotates ``_session_id`` and resets ``_turn_count``.
|
|
"""
|
|
new_id = str(new_session_id or "").strip()
|
|
if not new_id or not self._client:
|
|
return
|
|
|
|
rewound = bool(kwargs.get("rewound"))
|
|
|
|
# Rotate cached session state synchronously (cheap, in-memory) and
|
|
# snapshot the old session under the lock so a concurrent sync_turn
|
|
# either lands fully before the rotation (counted under old) or fully
|
|
# after (counted under new) — never split. The OLD session's commit
|
|
# (drain + pending-token GET + commit POST, potentially many seconds)
|
|
# is then offloaded so /new, /branch, /resume, /undo never block the
|
|
# caller's command thread (cf. the end-of-turn-sync offload in #41945).
|
|
with self._session_state_lock:
|
|
old_session_id = self._session_id
|
|
old_turn_count = self._turn_count
|
|
rotate = not (rewound or new_id == old_session_id)
|
|
if rotate:
|
|
self._session_id = new_id
|
|
self._turn_count = 0
|
|
|
|
# Invalidate stale prefetch OUTSIDE the session lock — it takes its own
|
|
# _prefetch_lock and may join a prefetch thread for up to 3s, which we
|
|
# must not do while holding the session lock (would block sync_turn and
|
|
# risk lock-ordering coupling).
|
|
self._invalidate_prefetch_state()
|
|
|
|
if not rotate:
|
|
# Same-session rewind (/undo) or no-op rotation: no commit, no
|
|
# counter reset — just the prefetch invalidation above.
|
|
logger.debug(
|
|
"OpenViking on_session_switch invalidated state without rotation: "
|
|
"session=%s rewound=%s",
|
|
old_session_id, rewound,
|
|
)
|
|
return
|
|
|
|
# Drain + commit the OLD session off the command thread.
|
|
if old_session_id:
|
|
self._finalize_session_async(old_session_id, old_turn_count, context="on switch")
|
|
|
|
logger.debug(
|
|
"OpenViking on_session_switch: old=%s new=%s parent=%s reset=%s",
|
|
old_session_id, new_id, parent_session_id, reset,
|
|
)
|
|
|
|
def _build_memory_uri(self, subdir: str) -> str:
|
|
"""Build a viking:// memory URI under the configured user/agent/subdir."""
|
|
slug = uuid.uuid4().hex[:12]
|
|
return f"viking://user/{self._user}/agent/{self._agent}/memories/{subdir}/mem_{slug}.md"
|
|
|
|
def on_memory_write(
|
|
self,
|
|
action: str,
|
|
target: str,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> None:
|
|
"""Mirror built-in memory writes to OpenViking via content/write."""
|
|
if not self._client or action != "add" or not content:
|
|
return
|
|
|
|
subdir = _MEMORY_WRITE_TARGET_SUBDIR_MAP.get(target, _DEFAULT_MEMORY_SUBDIR)
|
|
uri = self._build_memory_uri(subdir)
|
|
|
|
def _write():
|
|
try:
|
|
client = _VikingClient(
|
|
self._endpoint, self._api_key,
|
|
account=self._account, user=self._user, agent=self._agent,
|
|
)
|
|
client.post("/api/v1/content/write", {
|
|
"uri": uri,
|
|
"content": content,
|
|
"mode": "create",
|
|
})
|
|
except Exception as e:
|
|
logger.debug("OpenViking memory mirror failed: %s", e)
|
|
|
|
t = threading.Thread(target=_write, daemon=True, name="openviking-memwrite")
|
|
t.start()
|
|
|
|
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
|
return [SEARCH_SCHEMA, READ_SCHEMA, BROWSE_SCHEMA, REMEMBER_SCHEMA, ADD_RESOURCE_SCHEMA]
|
|
|
|
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
|
|
if not self._client:
|
|
return tool_error("OpenViking server not connected")
|
|
|
|
try:
|
|
if tool_name == "viking_search":
|
|
return self._tool_search(args)
|
|
elif tool_name == "viking_read":
|
|
return self._tool_read(args)
|
|
elif tool_name == "viking_browse":
|
|
return self._tool_browse(args)
|
|
elif tool_name == "viking_remember":
|
|
return self._tool_remember(args)
|
|
elif tool_name == "viking_add_resource":
|
|
return self._tool_add_resource(args)
|
|
return tool_error(f"Unknown tool: {tool_name}")
|
|
except Exception as e:
|
|
return tool_error(str(e))
|
|
|
|
def shutdown(self) -> None:
|
|
# Stop deferred finalizers from issuing new commits against a
|
|
# torn-down client, then drain everything still in flight.
|
|
self._shutting_down = True
|
|
# Wait for every in-flight writer across all tracked sessions.
|
|
with self._inflight_lock:
|
|
all_workers = [
|
|
t for workers in self._inflight_writers.values() for t in workers
|
|
]
|
|
with self._deferred_commit_lock:
|
|
deferred_workers = list(self._deferred_commit_threads)
|
|
with self._prefetch_lock:
|
|
prefetch_workers = list(self._prefetch_threads)
|
|
for t in all_workers:
|
|
if t.is_alive():
|
|
t.join(timeout=5.0)
|
|
for t in deferred_workers:
|
|
if t.is_alive():
|
|
t.join(timeout=5.0)
|
|
for t in prefetch_workers:
|
|
if t.is_alive():
|
|
t.join(timeout=5.0)
|
|
# Clear atexit reference so it doesn't double-commit.
|
|
global _last_active_provider
|
|
if _last_active_provider is self:
|
|
_last_active_provider = None
|
|
|
|
# -- Tool implementations ------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _unwrap_result(resp: Any) -> Any:
|
|
"""Return OpenViking payload body regardless of wrapped/unwrapped shape."""
|
|
if isinstance(resp, dict) and "result" in resp:
|
|
return resp.get("result")
|
|
return resp
|
|
|
|
@staticmethod
|
|
def _normalize_summary_uri(uri: str) -> str:
|
|
"""Map pseudo summary files to their parent directory URI for L0/L1 reads."""
|
|
if not uri:
|
|
return uri
|
|
for suffix in ("/.abstract.md", "/.overview.md", "/.read.md", "/.full.md"):
|
|
if uri.endswith(suffix):
|
|
return uri[: -len(suffix)] or "viking://"
|
|
return uri
|
|
|
|
def _is_directory_uri(self, uri: str) -> bool | None:
|
|
"""Probe fs/stat to decide if a URI is a directory.
|
|
|
|
Returns True/False when the server answers cleanly, and None when the
|
|
probe itself fails (network error, unexpected shape). Callers should
|
|
treat None as "unknown" and fall back to the exception-based path.
|
|
"""
|
|
try:
|
|
resp = self._client.get("/api/v1/fs/stat", params={"uri": uri})
|
|
except Exception:
|
|
return None
|
|
result = self._unwrap_result(resp)
|
|
if isinstance(result, dict):
|
|
if "isDir" in result:
|
|
return bool(result.get("isDir"))
|
|
if "is_dir" in result:
|
|
return bool(result.get("is_dir"))
|
|
if result.get("type") == "dir":
|
|
return True
|
|
if result.get("type") == "file":
|
|
return False
|
|
return None
|
|
|
|
def _tool_search(self, args: dict) -> str:
|
|
query = args.get("query", "")
|
|
if not query:
|
|
return tool_error("query is required")
|
|
|
|
payload: Dict[str, Any] = {"query": query}
|
|
mode = args.get("mode", "auto")
|
|
if mode != "auto":
|
|
payload["mode"] = mode
|
|
if args.get("scope"):
|
|
payload["target_uri"] = args["scope"]
|
|
if args.get("limit"):
|
|
payload["limit"] = args["limit"]
|
|
|
|
resp = self._client.post("/api/v1/search/find", payload)
|
|
result = resp.get("result", {})
|
|
|
|
# Format results for the model — keep it concise
|
|
scored_entries = []
|
|
for ctx_type in ("memories", "resources", "skills"):
|
|
items = result.get(ctx_type, [])
|
|
for item in items:
|
|
raw_score = item.get("score")
|
|
sort_score = raw_score if raw_score is not None else 0.0
|
|
entry = {
|
|
"uri": item.get("uri", ""),
|
|
"type": ctx_type.rstrip("s"),
|
|
"score": round(raw_score, 3) if raw_score is not None else 0.0,
|
|
"abstract": item.get("abstract", ""),
|
|
}
|
|
if item.get("relations"):
|
|
entry["related"] = [r.get("uri") for r in item["relations"][:3]]
|
|
scored_entries.append((sort_score, entry))
|
|
|
|
scored_entries.sort(key=lambda x: x[0], reverse=True)
|
|
formatted = [entry for _, entry in scored_entries]
|
|
|
|
return json.dumps({
|
|
"results": formatted,
|
|
"total": result.get("total", len(formatted)),
|
|
}, ensure_ascii=False)
|
|
|
|
def _tool_read(self, args: dict) -> str:
|
|
uri = args.get("uri", "")
|
|
if not uri:
|
|
return tool_error("uri is required")
|
|
|
|
level = args.get("level", "overview")
|
|
|
|
summary_level = level in {"abstract", "overview"}
|
|
# OpenViking expects directory URIs for pseudo summary files
|
|
# (e.g. viking://user/hermes/.overview.md).
|
|
resolved_uri = self._normalize_summary_uri(uri) if summary_level else uri
|
|
used_fallback = False
|
|
|
|
# abstract/overview endpoints are directory-only on OpenViking
|
|
# (v0.3.x returns 500/412 for file URIs). When the caller asks for a
|
|
# summary level on a non-pseudo URI, probe fs/stat first and route
|
|
# file URIs straight to /content/read instead of eating a failing
|
|
# round-trip. The pseudo-URI path already points at a directory, so
|
|
# skip the probe there.
|
|
if summary_level and resolved_uri == uri:
|
|
is_dir = self._is_directory_uri(uri)
|
|
if is_dir is False:
|
|
resolved_uri = uri
|
|
used_fallback = True
|
|
|
|
# Map our level names to OpenViking GET endpoints.
|
|
endpoint = "/api/v1/content/read"
|
|
if not used_fallback:
|
|
if level == "abstract":
|
|
endpoint = "/api/v1/content/abstract"
|
|
elif level == "overview":
|
|
endpoint = "/api/v1/content/overview"
|
|
|
|
try:
|
|
resp = self._client.get(endpoint, params={"uri": resolved_uri})
|
|
except Exception:
|
|
# OpenViking may return HTTP 500 for abstract/overview reads on normal
|
|
# file URIs (mem_*.md). For those, gracefully fallback to full read.
|
|
if not summary_level or resolved_uri != uri or used_fallback:
|
|
raise
|
|
resp = self._client.get("/api/v1/content/read", params={"uri": uri})
|
|
used_fallback = True
|
|
|
|
result = self._unwrap_result(resp)
|
|
# Content endpoints may return either plain strings or objects.
|
|
if isinstance(result, str):
|
|
content = result
|
|
elif isinstance(result, dict):
|
|
content = result.get("content", "") or result.get("text", "")
|
|
else:
|
|
content = ""
|
|
|
|
# Truncate long content to avoid flooding context.
|
|
max_len = 8000
|
|
if level == "overview":
|
|
max_len = 4000
|
|
elif level == "abstract":
|
|
max_len = 1200
|
|
|
|
if len(content) > max_len:
|
|
content = content[:max_len] + "\n\n[... truncated, use a more specific URI or full level]"
|
|
|
|
payload = {
|
|
"uri": uri,
|
|
"resolved_uri": resolved_uri,
|
|
"level": level,
|
|
"content": content,
|
|
}
|
|
if used_fallback:
|
|
payload["fallback"] = "content/read"
|
|
|
|
return json.dumps(payload, ensure_ascii=False)
|
|
|
|
def _tool_browse(self, args: dict) -> str:
|
|
action = args.get("action", "list")
|
|
path = args.get("path", "viking://")
|
|
|
|
# Map action to the correct fs endpoint (all GET with uri= param)
|
|
endpoint_map = {"tree": "/api/v1/fs/tree", "list": "/api/v1/fs/ls", "stat": "/api/v1/fs/stat"}
|
|
endpoint = endpoint_map.get(action, "/api/v1/fs/ls")
|
|
resp = self._client.get(endpoint, params={"uri": path})
|
|
result = self._unwrap_result(resp)
|
|
|
|
# Format list/tree results for readability
|
|
if action in {"list", "tree"}:
|
|
raw_entries = result
|
|
if isinstance(result, dict):
|
|
raw_entries = result.get("entries") or result.get("items") or result.get("children") or []
|
|
|
|
if isinstance(raw_entries, list):
|
|
entries = []
|
|
for e in raw_entries[:50]: # cap at 50 entries
|
|
uri = e.get("uri", "")
|
|
name = e.get("rel_path") or e.get("name") or (uri.rsplit("/", 1)[-1] if uri else "")
|
|
is_dir = bool(e.get("isDir") or e.get("is_dir") or e.get("type") == "dir")
|
|
entries.append({
|
|
"name": name,
|
|
"uri": uri,
|
|
"type": "dir" if is_dir else "file",
|
|
"abstract": e.get("abstract", ""),
|
|
})
|
|
return json.dumps({"path": path, "entries": entries}, ensure_ascii=False)
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
def _tool_remember(self, args: dict) -> str:
|
|
content = args.get("content", "")
|
|
if not content:
|
|
return tool_error("content is required")
|
|
|
|
category = args.get("category", "")
|
|
subdir = _CATEGORY_SUBDIR_MAP.get(category, _DEFAULT_MEMORY_SUBDIR)
|
|
uri = self._build_memory_uri(subdir)
|
|
|
|
# Write directly via content/write API.
|
|
# This creates the file, stores the content, and queues vector indexing
|
|
# in a single call — no dependency on session commit / VLM extraction.
|
|
try:
|
|
result = self._client.post("/api/v1/content/write", {
|
|
"uri": uri,
|
|
"content": content,
|
|
"mode": "create",
|
|
})
|
|
written = result.get("result", {}).get("written_bytes", 0)
|
|
return json.dumps({
|
|
"status": "stored",
|
|
"message": f"Memory stored ({written}b) and queued for vector indexing.",
|
|
})
|
|
except Exception as e:
|
|
logger.error("OpenViking content/write failed: %s", e)
|
|
return tool_error(f"Failed to store memory: {e}")
|
|
|
|
def _tool_add_resource(self, args: dict) -> str:
|
|
url = args.get("url", "")
|
|
if not url:
|
|
return tool_error("url is required")
|
|
|
|
if args.get("to") and args.get("parent"):
|
|
return tool_error("Cannot specify both 'to' and 'parent'")
|
|
|
|
payload: Dict[str, Any] = {}
|
|
for key in ("reason", "to", "parent", "instruction", "wait", "timeout"):
|
|
if key in args and args[key] not in {None, ""}:
|
|
payload[key] = args[key]
|
|
|
|
parsed_url = urlparse(url)
|
|
if _is_remote_resource_source(url):
|
|
source_path = None
|
|
elif parsed_url.scheme == "file":
|
|
source_path = _path_from_file_uri(url)
|
|
if isinstance(source_path, str):
|
|
return tool_error(source_path)
|
|
elif parsed_url.scheme and not _is_windows_absolute_path(url):
|
|
source_path = None
|
|
else:
|
|
source_path = Path(url).expanduser()
|
|
|
|
cleanup_path: Optional[Path] = None
|
|
try:
|
|
if source_path is not None:
|
|
if source_path.exists():
|
|
if source_path.is_dir():
|
|
payload["source_name"] = source_path.name
|
|
cleanup_path = _zip_directory(source_path)
|
|
upload_path = cleanup_path
|
|
elif source_path.is_file():
|
|
payload["source_name"] = source_path.name
|
|
upload_path = source_path
|
|
else:
|
|
return tool_error(f"Unsupported local resource path: {url}")
|
|
payload["temp_file_id"] = self._client.upload_temp_file(upload_path)
|
|
elif _is_local_path_reference(url):
|
|
return tool_error(f"Local resource path does not exist: {url}")
|
|
else:
|
|
payload["path"] = url
|
|
else:
|
|
payload["path"] = url
|
|
|
|
resp = self._client.post("/api/v1/resources", payload)
|
|
result = resp.get("result", {})
|
|
finally:
|
|
if cleanup_path:
|
|
cleanup_path.unlink(missing_ok=True)
|
|
|
|
return json.dumps({
|
|
"status": "added",
|
|
"root_uri": result.get("root_uri", ""),
|
|
"message": "Resource queued for processing. Use viking_search after a moment to find it.",
|
|
}, ensure_ascii=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def register(ctx) -> None:
|
|
"""Register OpenViking as a memory provider plugin."""
|
|
ctx.register_memory_provider(OpenVikingMemoryProvider())
|