From a1ac6d5b1c0d4e38ff6bfd0243efb43b0ca86bba Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:49:53 +0200 Subject: [PATCH 1/9] feat(mem0_oss): add plugins/memory/mem0_oss/README.md --- plugins/memory/mem0_oss/README.md | 200 ++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 plugins/memory/mem0_oss/README.md diff --git a/plugins/memory/mem0_oss/README.md b/plugins/memory/mem0_oss/README.md new file mode 100644 index 000000000..caf12e0e4 --- /dev/null +++ b/plugins/memory/mem0_oss/README.md @@ -0,0 +1,200 @@ +# Mem0 OSS Memory Plugin + +Self-hosted, privacy-first long-term memory using the open-source +[mem0ai](https://github.com/mem0ai/mem0) library. No cloud API key or +external service needed — everything runs on your machine. + +## How it works + +- **LLM fact extraction** — after each conversation turn, mem0 uses an LLM + to extract important facts, preferences, and context from the exchange. +- **Semantic search** — memories are stored in a local [Qdrant](https://qdrant.tech/) + vector database. Searches use embedding-based similarity so natural-language + queries work well. +- **Automatic deduplication** — mem0 merges new facts with existing ones to + avoid duplicate storage. +- **Built-in memory mirroring** — writes via the built-in `memory` tool are + automatically mirrored into mem0 via `on_memory_write`, so nothing is lost + whether you use the native tool or the mem0-specific tools. + +## Setup + +### 1. Install dependencies + +```bash +pip install mem0ai qdrant-client +``` + +### 2. Configure a backend + +**Zero extra config — auto-detect (recommended)** + +If you already have a Hermes provider configured (OpenRouter, Anthropic, OpenAI, +or AWS Bedrock), mem0 OSS will automatically pick it up — no `MEM0_OSS_*` vars +needed. The plugin mirrors the standard Hermes auxiliary provider priority: + +``` +OPENROUTER_API_KEY → uses OpenRouter (openrouter → openai adapter) +ANTHROPIC_API_KEY → uses Anthropic directly +OPENAI_API_KEY → uses OpenAI (+ OPENAI_BASE_URL if set) +AWS_ACCESS_KEY_ID → uses AWS Bedrock (boto3 reads creds automatically) +``` + +The first matching key wins. + +**Option A — config.yaml (preferred for per-provider control)** + +The plugin inherits from `auxiliary.default` if no `auxiliary.mem0_oss` block +exists, so if you've already set a default auxiliary provider for other tasks +you get mem0 OSS for free: + +```yaml +# ~/.hermes/config.yaml +auxiliary: + default: # inherited by mem0_oss and all other aux tasks + provider: auto + model: us.anthropic.claude-haiku-4-5-20251001-v1:0 + + # Optional — override just for mem0_oss: + mem0_oss: + provider: openrouter # or openai, anthropic, ollama, aws_bedrock, custom + model: openai/gpt-4o-mini + # api_key: ... # optional — falls back to provider's standard env var + # base_url: ... # optional — for custom/local endpoints +``` + +Supported provider values: `openrouter`, `openai`, `anthropic`, `ollama`, +`lmstudio`, `aws_bedrock` (alias: `bedrock`), `custom`, `auto`. +`auto` uses the same env-var detection order as zero-config. + +**Option B — AWS Bedrock** + +If you already use Hermes with Bedrock, no additional config is needed. +The plugin reuses `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_REGION`. + +LLM default: `us.anthropic.claude-haiku-4-5-20251001-v1:0` +Embedder default: `amazon.titan-embed-text-v2:0` (1024-dim) + +**Option C — OpenAI** + +```bash +# If OPENAI_API_KEY is already set, nothing more is needed. +# Override model/embedder explicitly if desired: +export MEM0_OSS_LLM_MODEL=gpt-4o-mini +export MEM0_OSS_EMBEDDER_MODEL=text-embedding-3-small +export MEM0_OSS_EMBEDDER_DIMS=1536 +``` + +**Option D — Ollama (fully local, no API key)** + +```bash +export MEM0_OSS_LLM_PROVIDER=ollama +export MEM0_OSS_LLM_MODEL=llama3.2 +export MEM0_OSS_EMBEDDER_PROVIDER=ollama +export MEM0_OSS_EMBEDDER_MODEL=nomic-embed-text +export MEM0_OSS_EMBEDDER_DIMS=768 +``` + +### 3. Activate + +```yaml +# ~/.hermes/config.yaml +memory: + provider: mem0_oss +``` + +Or use the interactive setup wizard: + +```bash +hermes memory setup # select "mem0_oss" +``` + +## Storage + +| Path | Contents | +|------|----------| +| `$HERMES_HOME/mem0_oss/qdrant/` | Qdrant vector store (all memories) | +| `$HERMES_HOME/mem0_oss/history.db` | mem0 history SQLite database | + +Override with `MEM0_OSS_VECTOR_STORE_PATH` and `MEM0_OSS_HISTORY_DB_PATH`. + +Non-secret settings can also be persisted to `$HERMES_HOME/mem0_oss.json` +by the setup wizard (via `save_config`), or written manually — see +[All configuration options](#all-configuration-options). + +## Agent tools + +| Tool | Description | +|------|-------------| +| `mem0_oss_search` | Semantic search over stored memories | + +Facts are extracted and stored automatically on every conversation turn via +`sync_turn` — no explicit save call needed. + +Writes via the built-in `memory` tool are also mirrored automatically into +mem0 via `on_memory_write`. To explicitly save something mid-session, use +the built-in `memory` tool; it will propagate to mem0 automatically. + +## Concurrent access (WebUI + gateway) + +The plugin uses embedded Qdrant which normally allows only one process at a +time. To avoid conflicts when both the WebUI and the gateway run on the same +host, the plugin creates a fresh `Memory` instance per operation and releases +the Qdrant lock immediately after each call. If a brief overlap occurs the +operation is skipped gracefully (logged at DEBUG, not counted as a failure) +rather than raising an error. + +## All configuration options + +### Environment variables + +| Env var | Default | Description | +|---------|---------|-------------| +| `MEM0_OSS_LLM_PROVIDER` | auto-detected | LLM provider (`openrouter`, `openai`, `anthropic`, `ollama`, `aws_bedrock`, …) | +| `MEM0_OSS_LLM_MODEL` | provider default | LLM model id | +| `MEM0_OSS_EMBEDDER_PROVIDER` | mirrors LLM provider | Embedder provider | +| `MEM0_OSS_EMBEDDER_MODEL` | provider default | Embedder model id | +| `MEM0_OSS_EMBEDDER_DIMS` | provider default | Embedding dimensions | +| `MEM0_OSS_COLLECTION` | `hermes` | Qdrant collection name | +| `MEM0_OSS_USER_ID` | `hermes-user` | Memory namespace | +| `MEM0_OSS_TOP_K` | `10` | Default search result count | +| `MEM0_OSS_VECTOR_STORE_PATH` | `$HERMES_HOME/mem0_oss/qdrant` | On-disk Qdrant path | +| `MEM0_OSS_HISTORY_DB_PATH` | `$HERMES_HOME/mem0_oss/history.db` | SQLite history path | +| `MEM0_OSS_API_KEY` | _(auto-detected from provider env var)_ | Explicit API key for the LLM backend | +| `MEM0_OSS_OPENAI_BASE_URL` | _(none)_ | OpenAI-compatible endpoint override | + +### config.yaml (auxiliary.mem0_oss / auxiliary.default) + +`auxiliary.mem0_oss` keys take precedence; any key not set there falls back to +`auxiliary.default` (which is also used by compression, vision, and other aux tasks). + +| Key | Description | +|-----|-------------| +| `provider` | Hermes provider name (see auto-detect order above) | +| `model` | LLM model id | +| `base_url` | Custom OpenAI-compatible endpoint | +| `api_key` | Explicit API key (takes precedence over env vars) | + +### Key resolution priority + +1. `MEM0_OSS_API_KEY` env var +2. `auxiliary.mem0_oss.api_key` in `config.yaml` +3. Provider's standard env var (`OPENROUTER_API_KEY`, `ANTHROPIC_API_KEY`, + `OPENAI_API_KEY`, …) resolved via the Hermes provider registry +4. AWS credentials from environment (for Bedrock) + +Or put non-secret settings in `$HERMES_HOME/mem0_oss.json` (keys are the +env-var names without the `MEM0_OSS_` prefix, in snake_case): + +```json +{ + "llm_provider": "aws_bedrock", + "llm_model": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "embedder_provider": "aws_bedrock", + "embedder_model": "amazon.titan-embed-text-v2:0", + "embedder_dims": 1024, + "collection": "hermes", + "user_id": "hermes-user", + "top_k": 10 +} +``` From eb98e9202e02f33b89e4f29d9b11bf887d878130 Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:49:54 +0200 Subject: [PATCH 2/9] feat(mem0_oss): add plugins/memory/mem0_oss/__init__.py --- plugins/memory/mem0_oss/__init__.py | 966 ++++++++++++++++++++++++++++ 1 file changed, 966 insertions(+) create mode 100644 plugins/memory/mem0_oss/__init__.py diff --git a/plugins/memory/mem0_oss/__init__.py b/plugins/memory/mem0_oss/__init__.py new file mode 100644 index 000000000..24ed00d96 --- /dev/null +++ b/plugins/memory/mem0_oss/__init__.py @@ -0,0 +1,966 @@ +"""Mem0 OSS (self-hosted) memory plugin — MemoryProvider interface. + +LLM-powered fact extraction, semantic vector search, and automatic +deduplication using the open-source ``mem0ai`` library — no cloud API key +required. All data is stored locally on disk. + +Backend choices: + Vector store: Qdrant (local path, no server) — default + LLM / Embedder: resolved from ``auxiliary.mem0_oss`` in config.yaml, then + from ``MEM0_OSS_*`` env vars, then auto-detected. + +Primary config — config.yaml (auxiliary.mem0_oss): + provider — Hermes provider name: "auto", "aws_bedrock", "bedrock", + "openai", "openrouter", "ollama", "anthropic", or "custom". + "auto" follows the standard Hermes auxiliary resolution chain. + model — LLM model id (provider-specific slug). Empty = provider default. + base_url — OpenAI-compatible endpoint (forces provider="custom"). + api_key — API key for that endpoint. Falls back to MEM0_OSS_API_KEY. + +Secondary config — environment variables: + MEM0_OSS_VECTOR_STORE_PATH — on-disk path for Qdrant (default: $HERMES_HOME/mem0_oss/qdrant) + MEM0_OSS_HISTORY_DB_PATH — SQLite history path (default: $HERMES_HOME/mem0_oss/history.db) + MEM0_OSS_COLLECTION — Qdrant collection name (default: hermes) + MEM0_OSS_USER_ID — memory namespace (default: hermes-user) + MEM0_OSS_LLM_PROVIDER — override auxiliary.mem0_oss.provider + MEM0_OSS_LLM_MODEL — override auxiliary.mem0_oss.model + MEM0_OSS_EMBEDDER_PROVIDER — mem0 embedder provider (default: matches llm provider) + MEM0_OSS_EMBEDDER_MODEL — embedder model id + MEM0_OSS_EMBEDDER_DIMS — embedding dimensions (default: auto per provider) + MEM0_OSS_TOP_K — max results returned per search (default: 10) + +Secret config: + MEM0_OSS_API_KEY — dedicated API key for mem0 LLM calls; takes + precedence over auxiliary.mem0_oss.api_key. + Falls back to the provider's standard env var + (OPENAI_API_KEY, ANTHROPIC_API_KEY, + OPENROUTER_API_KEY, etc.) resolved via the + Hermes provider registry — so no extra key is + needed when a main Hermes provider is already + configured. + (AWS Bedrock uses AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / AWS_REGION.) + +Optional $HERMES_HOME/mem0_oss.json for non-secret overrides: + { + "llm_provider": "aws_bedrock", + "llm_model": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "embedder_provider": "aws_bedrock", + "embedder_model": "amazon.titan-embed-text-v2:0", + "embedder_dims": 1024, + "collection": "hermes", + "user_id": "hermes-user", + "top_k": 10 + } +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +import time +from typing import Any, Dict, List, Optional + +from agent.memory_provider import MemoryProvider +from hermes_constants import get_hermes_home +from tools.registry import tool_error + +logger = logging.getLogger(__name__) + +# Circuit breaker: after this many consecutive failures, pause for cooldown. +_BREAKER_THRESHOLD = 5 +_BREAKER_COOLDOWN_SECS = 120 + +# Qdrant embedded lock error substring — used to detect contention gracefully. +_QDRANT_LOCK_ERROR = "already accessed by another instance" + + +# --------------------------------------------------------------------------- +# Config helpers +# --------------------------------------------------------------------------- + +def _get_aux_config() -> dict: + """Read auxiliary.mem0_oss from config.yaml, with fallback to auxiliary.default. + + Keys not present in auxiliary.mem0_oss are inherited from auxiliary.default + (if set) so that a single default auxiliary provider covers all aux tasks. + Returns {} on any failure. + """ + try: + from hermes_cli.config import load_config + config = load_config() + except Exception: + return {} + aux = config.get("auxiliary", {}) if isinstance(config, dict) else {} + if not isinstance(aux, dict): + return {} + default = aux.get("default", {}) or {} + task = aux.get("mem0_oss", {}) or {} + # task-specific keys win; default fills in anything not set + merged = {**default, **task} + return merged + + +def _resolve_auto_credentials(aux_provider: str, aux_model: str, + aux_base_url: str, aux_api_key: str): + """When no specific provider is set, fall through to the default auxiliary chain. + + Mirrors the Hermes auxiliary auto-detection priority order so that users + with a main provider configured (OPENROUTER_API_KEY, ANTHROPIC_API_KEY, …) + don't need to also set MEM0_OSS_API_KEY. + + If an explicit provider is already configured (aux_provider is non-empty and + not "auto"), this function is a no-op and returns the inputs unchanged. + + Returns (hermes_provider, model, base_url, api_key) — all strings, never None. + """ + # Only kick in when no explicit provider was configured + if aux_provider and aux_provider.lower() not in ("", "auto"): + return aux_provider, aux_model, aux_base_url, aux_api_key + + # If task-level config.yaml has a specific auxiliary.mem0_oss entry with a + # provider set, _resolve_task_provider_model returns that; otherwise "auto". + # Rather than creating a full OpenAI client we probe env vars directly in + # the same priority order the auxiliary auto-detect chain uses. + try: + from agent.auxiliary_client import _resolve_task_provider_model + h_provider, h_model, h_base_url, h_api_key, _api_mode = ( + _resolve_task_provider_model("mem0_oss") + ) + # If the task config actually resolved a specific non-auto provider, + # use that directly (covers auxiliary.mem0_oss.provider = "openrouter" etc.) + if h_provider and h_provider != "auto": + resolved_provider = h_provider + resolved_model = aux_model or h_model or "" + resolved_base_url = aux_base_url or h_base_url or "" + resolved_api_key = aux_api_key or h_api_key or "" + # Still try to fill missing key from provider registry + if not resolved_api_key and resolved_provider not in ( + "aws_bedrock", "bedrock", "aws", "ollama", "lmstudio"): + try: + from hermes_cli.auth import resolve_api_key_provider_credentials + creds = resolve_api_key_provider_credentials(resolved_provider) + resolved_api_key = str(creds.get("api_key", "") or "").strip() + if not resolved_base_url: + resolved_base_url = str(creds.get("base_url", "") or "").strip() + except Exception: + pass + return resolved_provider, resolved_model, resolved_base_url, resolved_api_key + except Exception: + pass + + # Full auto-detect: first try to mirror the main runtime provider so that + # mem0 uses the same provider as the rest of Hermes. Fall back to env-var + # probe only when the main provider isn't usable for aux tasks. + try: + from agent.auxiliary_client import _read_main_provider + main_provider = (_read_main_provider() or "").strip().lower() + if main_provider in ("bedrock", "aws_bedrock", "aws"): + return "aws_bedrock", aux_model, aux_base_url, aux_api_key + if main_provider == "anthropic": + anthropic_key = os.environ.get("ANTHROPIC_API_KEY", "").strip() + if anthropic_key: + return "anthropic", aux_model, aux_base_url, aux_api_key or anthropic_key + if main_provider == "openai": + openai_key = os.environ.get("OPENAI_API_KEY", "").strip() + if openai_key: + base_url = os.environ.get("OPENAI_BASE_URL", "").strip() + return "openai", aux_model, aux_base_url or base_url, aux_api_key or openai_key + if main_provider == "openrouter": + openrouter_key = os.environ.get("OPENROUTER_API_KEY", "").strip() + if openrouter_key: + base_url = os.environ.get("OPENROUTER_BASE_URL", + "https://openrouter.ai/api/v1").strip() + return "openrouter", aux_model, aux_base_url or base_url, aux_api_key or openrouter_key + except Exception: + pass + + # Fallback env-var probe (no main provider available) + openrouter_key = os.environ.get("OPENROUTER_API_KEY", "").strip() + if openrouter_key: + base_url = os.environ.get("OPENROUTER_BASE_URL", + "https://openrouter.ai/api/v1").strip() + return "openrouter", aux_model, aux_base_url or base_url, aux_api_key or openrouter_key + + anthropic_key = os.environ.get("ANTHROPIC_API_KEY", "").strip() + if anthropic_key: + return "anthropic", aux_model, aux_base_url, aux_api_key or anthropic_key + + openai_key = os.environ.get("OPENAI_API_KEY", "").strip() + if openai_key: + base_url = os.environ.get("OPENAI_BASE_URL", "").strip() + return "openai", aux_model, aux_base_url or base_url, aux_api_key or openai_key + + # Bedrock: no API key needed, boto3 reads from env/profile automatically + if os.environ.get("AWS_ACCESS_KEY_ID") or os.environ.get("AWS_PROFILE"): + return "aws_bedrock", aux_model, aux_base_url, aux_api_key + + # Nothing found — return "auto" and let _load_config fall back to aws_bedrock default + return aux_provider or "auto", aux_model, aux_base_url, aux_api_key + + +def _load_config() -> dict: + """Load config from env vars, with $HERMES_HOME/mem0_oss.json overrides. + + Priority for LLM provider/model/api_key (highest → lowest): + 1. MEM0_OSS_LLM_PROVIDER / MEM0_OSS_LLM_MODEL env vars + 2. auxiliary.mem0_oss.provider / .model in config.yaml + 3. Default auxiliary chain (auto-detect from Hermes config) — uses the + provider's standard env var (OPENROUTER_API_KEY, ANTHROPIC_API_KEY, …) + so MEM0_OSS_API_KEY is not required when a main provider is configured. + 4. Defaults (aws_bedrock) + + Priority for API key: + 1. MEM0_OSS_API_KEY env var + 2. auxiliary.mem0_oss.api_key in config.yaml + 3. Provider standard env var (OPENROUTER_API_KEY, ANTHROPIC_API_KEY, etc.) + resolved via the Hermes provider registry. + + Environment variables are the base; the JSON file (if present) overrides + individual keys. Neither source is required — sensible defaults apply. + """ + hermes_home = get_hermes_home() + qdrant_path = str(hermes_home / "mem0_oss" / "qdrant") + history_path = str(hermes_home / "mem0_oss" / "history.db") + + aux = _get_aux_config() + aux_provider = str(aux.get("provider", "") or "").strip() + aux_model = str(aux.get("model", "") or "").strip() + aux_base_url = str(aux.get("base_url", "") or "").strip() + aux_api_key = str(aux.get("api_key", "") or "").strip() + + # MEM0_OSS_API_KEY is the dedicated key; falls back to aux config key, then + # to the provider's standard env var via _resolve_auto_credentials below. + explicit_api_key = ( + os.environ.get("MEM0_OSS_API_KEY", "").strip() + or aux_api_key + ) + # base_url: env var wins, then aux config + explicit_base_url = ( + os.environ.get("MEM0_OSS_OPENAI_BASE_URL", "").strip() + or aux_base_url + ) + + # When no specific provider is configured, fall through to the default + # auxiliary chain so we inherit the user's main Hermes provider + key. + auto_provider, auto_model, auto_base_url, auto_api_key = _resolve_auto_credentials( + aux_provider, aux_model, explicit_base_url, explicit_api_key + ) + + resolved_api_key = explicit_api_key or auto_api_key + resolved_base_url = explicit_base_url or auto_base_url + + # LLM provider: env > aux config > auto-detected > default + default_llm_provider = aux_provider or auto_provider or "openai" + llm_provider = os.environ.get("MEM0_OSS_LLM_PROVIDER", default_llm_provider).strip() + # Normalise Hermes provider aliases → mem0 provider keys + llm_provider = _normalise_provider(llm_provider) + + # LLM model: env > aux config > auto-detected > per-provider default + default_llm_model = aux_model or auto_model or _default_model_for(llm_provider) + llm_model = os.environ.get("MEM0_OSS_LLM_MODEL", default_llm_model).strip() + + # Embedder defaults mirror the LLM provider + default_emb_provider = _default_embedder_provider(llm_provider) + default_emb_model = _default_embedder_model(default_emb_provider) + default_emb_dims = _default_embedder_dims(default_emb_provider) + + config: dict = { + "vector_store_path": os.environ.get("MEM0_OSS_VECTOR_STORE_PATH", qdrant_path), + "history_db_path": os.environ.get("MEM0_OSS_HISTORY_DB_PATH", history_path), + "collection": os.environ.get("MEM0_OSS_COLLECTION", "hermes"), + "user_id": os.environ.get("MEM0_OSS_USER_ID", "hermes-user"), + "llm_provider": llm_provider, + "llm_model": llm_model, + "embedder_provider": _normalise_provider( + os.environ.get("MEM0_OSS_EMBEDDER_PROVIDER", default_emb_provider) + ), + "embedder_model": os.environ.get("MEM0_OSS_EMBEDDER_MODEL", default_emb_model), + "embedder_dims": int(os.environ.get("MEM0_OSS_EMBEDDER_DIMS", str(default_emb_dims))), + "top_k": int(os.environ.get("MEM0_OSS_TOP_K", "10")), + # Resolved credentials / endpoint + "api_key": resolved_api_key, + "base_url": resolved_base_url, + # Legacy key kept for backwards compat with tests and mem0_oss.json + "openai_api_key": resolved_api_key, + "openai_base_url": resolved_base_url, + } + + config_path = hermes_home / "mem0_oss.json" + if config_path.exists(): + try: + file_cfg = json.loads(config_path.read_text(encoding="utf-8")) + config.update({k: v for k, v in file_cfg.items() if v is not None and v != ""}) + except Exception as exc: + logger.warning("mem0_oss: failed to read config file %s: %s", config_path, exc) + + return config + + +# --------------------------------------------------------------------------- +# Provider normalisation helpers +# --------------------------------------------------------------------------- + +# Maps Hermes provider names / aliases → mem0 LLM provider keys +_HERMES_TO_MEM0_PROVIDER: dict = { + "bedrock": "aws_bedrock", + "aws": "aws_bedrock", + "aws_bedrock": "aws_bedrock", + "openai": "openai", + "openrouter": "openai", # mem0 uses OpenAI adapter with OR base URL + "anthropic": "anthropic", + "ollama": "ollama", + "lmstudio": "lmstudio", + "custom": "openai", # custom base_url → OpenAI-compatible adapter + "auto": "aws_bedrock", # resolved later in is_available(); placeholder +} + +_PROVIDER_DEFAULTS: dict = { + "aws_bedrock": ("us.anthropic.claude-haiku-4-5-20251001-v1:0", + "aws_bedrock", "amazon.titan-embed-text-v2:0", 1024), + # --- ordering note: openai is the last-resort default (most widely available) --- + "openai": ("gpt-4o-mini", "openai", "text-embedding-3-small", 1536), + "anthropic": ("claude-haiku-4-5-20251001", "openai", "text-embedding-3-small", 1536), + "ollama": ("llama3.1", "ollama", "nomic-embed-text", 768), + "lmstudio": ("llama-3.2-1b-instruct", "openai", "text-embedding-nomic-embed-text-v1.5", 768), + "openrouter": ("openai/gpt-4o-mini", "openai", "text-embedding-3-small", 1536), +} + + +def _normalise_provider(p: str) -> str: + p = (p or "").strip().lower() + return _HERMES_TO_MEM0_PROVIDER.get(p, p) or "openai" + + +def _default_model_for(mem0_provider: str) -> str: + return _PROVIDER_DEFAULTS.get(mem0_provider, _PROVIDER_DEFAULTS["openai"])[0] + + +def _default_embedder_provider(mem0_provider: str) -> str: + return _PROVIDER_DEFAULTS.get(mem0_provider, _PROVIDER_DEFAULTS["openai"])[1] + + +def _default_embedder_model(mem0_emb_provider: str) -> str: + for _llm_p, (_, emb_p, emb_m, _) in _PROVIDER_DEFAULTS.items(): + if emb_p == mem0_emb_provider: + return emb_m + return "text-embedding-3-small" + + +def _default_embedder_dims(mem0_emb_provider: str) -> int: + for _llm_p, (_, emb_p, _, emb_d) in _PROVIDER_DEFAULTS.items(): + if emb_p == mem0_emb_provider: + return emb_d + return 1536 + + +def _build_mem0_config(cfg: dict) -> dict: + """Build a mem0 MemoryConfig-compatible dict from our flattened config. + + Translates Hermes/mem0 provider names into the provider-specific config + structures that mem0ai expects, including credentials and base URLs. + """ + llm_provider = cfg["llm_provider"] + llm_model = cfg["llm_model"] + embedder_provider = cfg["embedder_provider"] + embedder_model = cfg["embedder_model"] + embedder_dims = cfg["embedder_dims"] + api_key = cfg.get("api_key") or cfg.get("openai_api_key") or "" + base_url = cfg.get("base_url") or cfg.get("openai_base_url") or "" + + llm_cfg = _build_llm_cfg(llm_provider, llm_model, api_key, base_url) + emb_cfg = _build_embedder_cfg(embedder_provider, embedder_model, embedder_dims, api_key, base_url) + + vs_cfg = { + "collection_name": cfg["collection"], + "path": cfg["vector_store_path"], + "embedding_model_dims": embedder_dims, + "on_disk": True, + } + + return { + "vector_store": { + "provider": "qdrant", + "config": vs_cfg, + }, + "llm": { + "provider": llm_provider, + "config": llm_cfg, + }, + "embedder": { + "provider": embedder_provider, + "config": emb_cfg, + }, + "history_db_path": cfg["history_db_path"], + "version": "v1.1", + } + + +def _build_llm_cfg(provider: str, model: str, api_key: str, base_url: str) -> dict: + """Build the provider-specific LLM config dict for mem0ai.""" + cfg: dict = {"model": model} + + if provider == "aws_bedrock": + # Bedrock reads creds from env vars automatically; we don't pass them + # explicitly unless they're set (boto3 picks them up from the environment). + pass + + elif provider in ("openai", "anthropic", "lmstudio"): + if api_key: + cfg["api_key"] = api_key + if base_url and provider == "openai": + cfg["openai_base_url"] = base_url + + elif provider == "ollama": + # Ollama uses openai_base_url pointing at the local server + cfg["openai_base_url"] = base_url or "http://localhost:11434" + + # openrouter is handled as openai with OR base URL — normalised upstream, + # so if it reaches here with provider=="openai" it already has base_url set. + + return cfg + + +def _build_embedder_cfg(provider: str, model: str, dims: int, + api_key: str, base_url: str) -> dict: + """Build the provider-specific embedder config dict for mem0ai.""" + cfg: dict = {"model": model} + + if provider == "aws_bedrock": + cfg["embedding_dims"] = dims + + elif provider in ("openai",): + cfg["embedding_dims"] = dims + if api_key: + cfg["api_key"] = api_key + if base_url: + cfg["openai_base_url"] = base_url + + elif provider == "ollama": + cfg["embedding_dims"] = dims + cfg["ollama_base_url"] = base_url or "http://localhost:11434" + + elif provider == "lmstudio": + cfg["embedding_dims"] = dims + if api_key: + cfg["api_key"] = api_key + + return cfg + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + +SEARCH_SCHEMA = { + "name": "mem0_oss_search", + "description": ( + "Search long-term memory using semantic similarity. Returns facts and context " + "ranked by relevance. Use this when you need information from past sessions " + "that is not already in the current conversation." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "What to search for."}, + "top_k": { + "type": "integer", + "description": "Max results (default: 10, max: 50).", + }, + }, + "required": ["query"], + }, +} + +ADD_SCHEMA = { + "name": "mem0_oss_add", + "description": ( + "Store a fact, preference, or piece of context to long-term memory. " + "mem0 deduplicates automatically — safe to call for any important detail." + ), + "parameters": { + "type": "object", + "properties": { + "content": {"type": "string", "description": "The information to store."}, + }, + "required": ["content"], + }, +} + + +# --------------------------------------------------------------------------- +# Provider class +# --------------------------------------------------------------------------- + +class Mem0OSSMemoryProvider(MemoryProvider): + """Self-hosted mem0 memory provider backed by a local Qdrant vector store. + + No cloud account required — all data stays on disk. Uses AWS Bedrock + (or OpenAI / Ollama) for LLM fact-extraction and embedding. + """ + + # -- MemoryProvider identity -------------------------------------------- + + @property + def name(self) -> str: + return "mem0_oss" + + # -- Availability ------------------------------------------------------- + + def is_available(self) -> bool: + """True if mem0ai is installed and at least one LLM backend is usable. + + We only check imports and credentials — no network calls here. + """ + try: + import mem0 # noqa: F401 + except ImportError: + return False + + cfg = _load_config() + llm_provider = cfg.get("llm_provider", "openai") + + if llm_provider == "aws_bedrock": + if os.environ.get("AWS_ACCESS_KEY_ID") or os.environ.get("AWS_PROFILE"): + return True + try: + from agent.bedrock_adapter import has_aws_credentials + return has_aws_credentials() + except Exception: + return False + if llm_provider == "anthropic": + return bool( + cfg.get("api_key") + or os.environ.get("ANTHROPIC_API_KEY") + ) + if llm_provider == "openai": + return bool( + cfg.get("api_key") + or cfg.get("openai_api_key") + or os.environ.get("OPENAI_API_KEY") + ) + if llm_provider in ("ollama", "lmstudio"): + return True # local, always assumed available + # Generic / custom base_url: trust the user's config + return True + + # -- Lifecycle ---------------------------------------------------------- + + def initialize(self, session_id: str, **kwargs) -> None: + """Build the mem0 Memory instance for this session.""" + self._session_id = session_id + self._agent_context = kwargs.get("agent_context", "primary") + self._cfg = _load_config() + self._user_id = self._cfg["user_id"] + self._top_k = self._cfg["top_k"] + + # Circuit-breaker state + self._fail_count = 0 + self._last_fail_ts = 0.0 + self._lock = threading.Lock() + + # Background sync state + self._sync_thread: Optional[threading.Thread] = None + + # Prefetch state (background thread fills this before each turn) + self._prefetch_result: str = "" + self._prefetch_thread: Optional[threading.Thread] = None + import pathlib + pathlib.Path(self._cfg["vector_store_path"]).mkdir(parents=True, exist_ok=True) + pathlib.Path(self._cfg["history_db_path"]).parent.mkdir(parents=True, exist_ok=True) + + def _get_memory(self) -> Any: + """Create a fresh mem0 Memory instance for each call. + + We intentionally do NOT cache the instance. The embedded Qdrant store + uses a file lock that is held for the lifetime of the client object. + When both the WebUI and the gateway run in the same host they would + otherwise compete for the lock — whichever process cached it first + would block all calls in the other process. + + By creating a new instance per call and letting it go out of scope + afterwards, the lock is acquired and released on each operation so + both processes can coexist. The overhead is acceptable: Qdrant + initialisation is fast once the collection already exists on disk. + """ + try: + from mem0 import Memory + from mem0.configs.base import MemoryConfig + + mem0_dict = _build_mem0_config(self._cfg) + mem_cfg = MemoryConfig(**{ + "vector_store": mem0_dict["vector_store"], + "llm": mem0_dict["llm"], + "embedder": mem0_dict["embedder"], + "history_db_path": mem0_dict["history_db_path"], + "version": mem0_dict["version"], + }) + return Memory(config=mem_cfg) + except Exception as exc: + logger.error("mem0_oss: failed to initialize Memory: %s", exc) + raise + + # -- Circuit breaker helpers ------------------------------------------- + + def _is_tripped(self) -> bool: + with self._lock: + if self._fail_count < _BREAKER_THRESHOLD: + return False + if time.monotonic() - self._last_fail_ts >= _BREAKER_COOLDOWN_SECS: + self._fail_count = 0 + return False + return True + + def _record_failure(self) -> None: + with self._lock: + self._fail_count += 1 + self._last_fail_ts = time.monotonic() + + def _record_success(self) -> None: + with self._lock: + self._fail_count = 0 + + # -- System prompt block ----------------------------------------------- + + def system_prompt_block(self) -> str: + return ( + "## Mem0 OSS Memory (self-hosted)\n" + "You have access to long-term memory stored locally via mem0.\n" + "- Use `mem0_oss_search` to recall relevant facts before answering.\n" + "- Facts are extracted and deduplicated automatically on each turn via sync_turn.\n" + "- To explicitly save something, use the built-in `memory` tool — it mirrors to mem0 automatically.\n" + "- Search is semantic — natural-language queries work well.\n" + ) + + # -- Prefetch (background recall before each turn) --------------------- + + def queue_prefetch(self, query: str, *, session_id: str = "") -> None: + """Start a background thread to recall context for the upcoming turn.""" + if self._is_tripped(): + return + + self._prefetch_result = "" + self._prefetch_thread = threading.Thread( + target=self._do_prefetch, + args=(query,), + daemon=True, + name="mem0-oss-prefetch", + ) + self._prefetch_thread.start() + + def _do_prefetch(self, query: str) -> None: + try: + mem = self._get_memory() + results = mem.search( + query=query[:500], + top_k=self._top_k, + filters={"user_id": self._user_id}, + ) + memories = _extract_results(results) + if memories: + lines = "\n".join(f"- {m}" for m in memories) + self._prefetch_result = f"Mem0 OSS Memory:\n{lines}" + self._record_success() + except Exception as exc: + if _QDRANT_LOCK_ERROR in str(exc): + logger.debug("mem0_oss: prefetch skipped — Qdrant lock held by another process") + return # not a real failure; don't trip the circuit breaker + self._record_failure() + logger.debug("mem0_oss: prefetch error: %s", exc) + + def prefetch(self, query: str, *, session_id: str = "") -> str: + """Return prefetched results (join background thread first).""" + if self._prefetch_thread is not None: + self._prefetch_thread.join(timeout=15.0) + self._prefetch_thread = None + return self._prefetch_result + + # -- Sync turn (auto-extract after each turn) -------------------------- + + def sync_turn( + self, user_content: str, assistant_content: str, *, session_id: str = "" + ) -> None: + """Spawn a background thread to extract and store facts from the turn.""" + if self._agent_context != "primary": + return + if self._is_tripped(): + return + + messages = [ + {"role": "user", "content": user_content}, + {"role": "assistant", "content": assistant_content}, + ] + self._sync_thread = threading.Thread( + target=self._do_sync, + args=(messages,), + daemon=True, + name="mem0-oss-sync", + ) + self._sync_thread.start() + + def _do_sync(self, messages: List[dict]) -> None: + try: + mem = self._get_memory() + mem.add(messages=messages, user_id=self._user_id, infer=True) + self._record_success() + except Exception as exc: + if _QDRANT_LOCK_ERROR in str(exc): + logger.debug("mem0_oss: sync_turn skipped — Qdrant lock held by another process") + return # not a real failure; don't trip the circuit breaker + self._record_failure() + logger.debug("mem0_oss: sync_turn error: %s", exc) + + # -- Tool schemas & dispatch ------------------------------------------- + + def get_tool_schemas(self) -> List[dict]: + return [SEARCH_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: + if tool_name == "mem0_oss_search": + return self._handle_search(args) + if tool_name == "mem0_oss_add": + return self._handle_add(args) + return tool_error(f"Unknown tool: {tool_name}") + + def _handle_search(self, args: Dict[str, Any]) -> str: + query = args.get("query", "").strip() + if not query: + return tool_error("mem0_oss_search requires 'query'") + + top_k = min(int(args.get("top_k", self._top_k)), 50) + + try: + mem = self._get_memory() + results = mem.search( + query=query, + top_k=top_k, + filters={"user_id": self._user_id}, + ) + memories = _extract_results(results) + self._record_success() + if not memories: + return json.dumps({"result": "No relevant memories found."}) + return json.dumps({"result": "\n".join(f"- {m}" for m in memories)}) + except Exception as exc: + self._record_failure() + if _QDRANT_LOCK_ERROR in str(exc): + logger.warning("mem0_oss: Qdrant lock held by another process — search skipped") + return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + logger.error("mem0_oss: search error: %s", exc) + return tool_error(f"mem0_oss_search failed: {exc}") + + def _handle_add(self, args: Dict[str, Any]) -> str: + content = args.get("content", "").strip() + if not content: + return tool_error("mem0_oss_add requires 'content'") + + try: + mem = self._get_memory() + mem.add( + messages=[{"role": "user", "content": content}], + user_id=self._user_id, + infer=True, + ) + self._record_success() + return json.dumps({"result": "Memory stored successfully."}) + except Exception as exc: + self._record_failure() + if _QDRANT_LOCK_ERROR in str(exc): + logger.warning("mem0_oss: Qdrant lock held by another process — add skipped") + return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + logger.error("mem0_oss: add error: %s", exc) + return tool_error(f"mem0_oss_add failed: {exc}") + + # -- Config schema (for setup wizard) ---------------------------------- + + def get_config_schema(self) -> List[dict]: + return [ + { + "key": "llm_provider", + "label": "LLM provider", + "description": "mem0 LLM provider key (openai, aws_bedrock, ollama, ...)", + "default": "openai", + "env": "MEM0_OSS_LLM_PROVIDER", + "required": False, + }, + { + "key": "llm_model", + "label": "LLM model", + "description": "Model id passed to the LLM provider", + "default": "gpt-4o-mini", + "env": "MEM0_OSS_LLM_MODEL", + "required": False, + }, + { + "key": "embedder_provider", + "label": "Embedder provider", + "description": "mem0 embedder provider key (openai, aws_bedrock, ...)", + "default": "openai", + "env": "MEM0_OSS_EMBEDDER_PROVIDER", + "required": False, + }, + { + "key": "embedder_model", + "label": "Embedding model id", + "description": "Embedding model id", + "default": "text-embedding-3-small", + "env": "MEM0_OSS_EMBEDDER_MODEL", + "required": False, + }, + { + "key": "embedder_dims", + "label": "Embedding dimensions", + "description": "Dimensions of the embedding model (must match the model)", + "default": 1024, + "env": "MEM0_OSS_EMBEDDER_DIMS", + "required": False, + }, + { + "key": "collection", + "label": "Qdrant collection name", + "description": "Name of the Qdrant collection storing memories", + "default": "hermes", + "env": "MEM0_OSS_COLLECTION", + "required": False, + }, + { + "key": "user_id", + "label": "User ID", + "description": "Memory namespace / user identifier", + "default": "hermes-user", + "env": "MEM0_OSS_USER_ID", + "required": False, + }, + { + "key": "top_k", + "label": "Top-K results", + "description": "Default number of memories returned per search", + "default": 10, + "env": "MEM0_OSS_TOP_K", + "required": False, + }, + { + "key": "api_key", + "label": "API key (mem0 LLM)", + "description": ( + "Dedicated API key for mem0 LLM/embedder calls. " + "Takes precedence over auxiliary.mem0_oss.api_key in config.yaml " + "and over OPENAI_API_KEY / ANTHROPIC_API_KEY. " + "Not needed for AWS Bedrock (uses AWS_ACCESS_KEY_ID)." + ), + "default": "", + "env": "MEM0_OSS_API_KEY", + "secret": True, + "required": False, + }, + { + "key": "openai_api_key", + "label": "API key (legacy alias)", + "description": "Legacy alias for api_key — prefer MEM0_OSS_API_KEY.", + "default": "", + "env": "MEM0_OSS_OPENAI_API_KEY", + "secret": True, + "required": False, + }, + { + "key": "base_url", + "label": "OpenAI-compatible base URL", + "description": ( + "Custom LLM endpoint (e.g. http://localhost:11434/v1 for Ollama, " + "or an OpenRouter-compatible URL). Also settable via " + "auxiliary.mem0_oss.base_url in config.yaml." + ), + "default": "", + "env": "MEM0_OSS_OPENAI_BASE_URL", + "required": False, + }, + ] + + def save_config(self, values: dict, hermes_home) -> None: + """Write non-secret config to $HERMES_HOME/mem0_oss.json. + + Merges ``values`` into any existing file so that only the supplied keys + are overwritten. Secret keys (api_key, openai_api_key) should be stored + in ``.env`` instead; this method stores them only if explicitly passed. + """ + import json + from pathlib import Path + + config_path = Path(hermes_home) / "mem0_oss.json" + existing: dict = {} + if config_path.exists(): + try: + existing = json.loads(config_path.read_text(encoding="utf-8")) + except Exception: + pass + existing.update(values) + config_path.write_text(json.dumps(existing, indent=2), encoding="utf-8") + + # -- Shutdown ---------------------------------------------------------- + + def on_memory_write(self, action: str, target: str, content: str) -> None: + """Mirror built-in memory tool writes into mem0 store. + + Called by the framework whenever the agent uses the builtin memory tool, + so writes go to mem0 automatically without the agent needing to call + mem0_oss_add explicitly. + """ + if action != "add" or not (content or "").strip(): + return + + def _write(): + try: + mem = self._get_memory() + mem.add( + messages=[{"role": "user", "content": content.strip()}], + user_id=self._user_id, + infer=False, + metadata={"source": "hermes_memory_tool", "target": target}, + ) + except Exception as e: + if _QDRANT_LOCK_ERROR in str(e): + logger.debug("mem0_oss on_memory_write skipped — Qdrant lock held by another process") + return + logger.debug("mem0_oss on_memory_write failed: %s", e) + + t = threading.Thread(target=_write, daemon=True, name="mem0-oss-memwrite") + t.start() + + def shutdown(self) -> None: + """Wait for any in-flight background threads.""" + for thread in (self._sync_thread, self._prefetch_thread): + if thread is not None and thread.is_alive(): + thread.join(timeout=10.0) + + +# --------------------------------------------------------------------------- +# Result extraction helper +# --------------------------------------------------------------------------- + +def _extract_results(results: Any) -> List[str]: + """Normalize mem0 search results (v1 list or v2 dict) to plain strings.""" + if isinstance(results, dict) and "results" in results: + items = results["results"] + elif isinstance(results, list): + items = results + else: + return [] + + memories = [] + for item in items: + if isinstance(item, dict): + mem = item.get("memory") or item.get("text") or "" + else: + mem = str(item) + if mem: + memories.append(mem) + return memories + + +# --------------------------------------------------------------------------- +# Plugin registration +# --------------------------------------------------------------------------- + +def register(ctx) -> None: + ctx.register_memory_provider(Mem0OSSMemoryProvider()) From 1ac1e5782bd68be3d374f865fc5f964a87a05441 Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:49:56 +0200 Subject: [PATCH 3/9] feat(mem0_oss): add tests/plugins/memory/test_mem0_oss_provider.py --- .../plugins/memory/test_mem0_oss_provider.py | 1044 +++++++++++++++++ 1 file changed, 1044 insertions(+) create mode 100644 tests/plugins/memory/test_mem0_oss_provider.py diff --git a/tests/plugins/memory/test_mem0_oss_provider.py b/tests/plugins/memory/test_mem0_oss_provider.py new file mode 100644 index 000000000..66c3d17ce --- /dev/null +++ b/tests/plugins/memory/test_mem0_oss_provider.py @@ -0,0 +1,1044 @@ +"""Tests for the Mem0 OSS self-hosted memory provider plugin. + +Covers config loading, _build_mem0_config, _extract_results, tool handlers +(search, add, unknown), circuit breaker, prefetch, sync_turn, system prompt, +schema completeness, availability checks, and shutdown. + +All tests are fully offline — mem0.Memory is always mocked. +""" + +from __future__ import annotations + +import json +import os +import threading +import time +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch, PropertyMock + +import pytest + +from plugins.memory.mem0_oss import ( + Mem0OSSMemoryProvider, + SEARCH_SCHEMA, + ADD_SCHEMA, + _extract_results, + _load_config, + _build_mem0_config, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + """Wipe all MEM0_OSS_* env vars between tests.""" + for key in list(os.environ): + if key.startswith("MEM0_OSS_"): + monkeypatch.delenv(key, raising=False) + + +def _make_mock_memory(search_results=None, add_raises=None): + """Build a mock mem0.Memory with controllable behaviour.""" + mem = MagicMock() + + default_search = [{"memory": "User likes dark mode"}, {"memory": "Project: hermes-agent"}] + mem.search.return_value = search_results if search_results is not None else default_search + + if add_raises: + mem.add.side_effect = add_raises + else: + mem.add.return_value = [{"id": "abc123", "memory": "stored"}] + + return mem + + +@pytest.fixture() +def provider(tmp_path, monkeypatch): + """Initialized provider with a mocked Memory instance. + + _get_memory() is patched to return a fresh MagicMock on every call so that + tests don't trigger real Qdrant initialisation. The mock is also exposed as + ``provider._mock_mem`` for assertion convenience. + """ + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + with patch("plugins.memory.mem0_oss._load_config") as mock_cfg: + mock_cfg.return_value = { + "vector_store_path": str(tmp_path / "qdrant"), + "history_db_path": str(tmp_path / "history.db"), + "collection": "hermes", + "user_id": "test-user", + "llm_provider": "aws_bedrock", + "llm_model": "us.anthropic.claude-haiku-4-5-20250714-v1:0", + "embedder_provider": "aws_bedrock", + "embedder_model": "amazon.titan-embed-text-v2:0", + "embedder_dims": 1024, + "top_k": 10, + "openai_api_key": "", + "openai_base_url": "", + } + p = Mem0OSSMemoryProvider() + p.initialize("test-session") + + mock_mem = _make_mock_memory() + # Patch _get_memory so every call returns the same mock without hitting Qdrant. + p._get_memory = lambda: mock_mem # type: ignore[assignment] + # Expose the mock for test assertions. + p._mock_mem = mock_mem # type: ignore[attr-defined] + return p + + +# --------------------------------------------------------------------------- +# _extract_results +# --------------------------------------------------------------------------- + + +class TestExtractResults: + def test_list_of_dicts_memory_key(self): + results = [{"memory": "A"}, {"memory": "B"}] + assert _extract_results(results) == ["A", "B"] + + def test_list_of_dicts_text_key(self): + results = [{"text": "C"}, {"text": "D"}] + assert _extract_results(results) == ["C", "D"] + + def test_v2_dict_wrapper(self): + results = {"results": [{"memory": "E"}, {"memory": "F"}]} + assert _extract_results(results) == ["E", "F"] + + def test_empty_list(self): + assert _extract_results([]) == [] + + def test_empty_v2_dict(self): + assert _extract_results({"results": []}) == [] + + def test_unknown_type_returns_empty(self): + assert _extract_results(None) == [] + assert _extract_results("string") == [] + + def test_filters_empty_strings(self): + results = [{"memory": "A"}, {"memory": ""}] + assert _extract_results(results) == ["A"] + + +# --------------------------------------------------------------------------- +# _load_config +# --------------------------------------------------------------------------- + + +class TestLoadConfig: + def test_defaults(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + # Ensure all provider env vars are absent so auto-detect returns "auto" + # and _load_config falls back to the aws_bedrock default. + for k in ("OPENROUTER_API_KEY", "ANTHROPIC_API_KEY", "OPENAI_API_KEY", + "AWS_ACCESS_KEY_ID", "AWS_PROFILE", "MEM0_OSS_LLM_PROVIDER"): + monkeypatch.delenv(k, raising=False) + cfg = _load_config() + assert cfg["llm_provider"] == "aws_bedrock" + assert cfg["embedder_provider"] == "aws_bedrock" + assert cfg["collection"] == "hermes" + assert cfg["user_id"] == "hermes-user" + assert cfg["top_k"] == 10 + assert cfg["embedder_dims"] == 1024 + + def test_env_overrides(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("MEM0_OSS_LLM_PROVIDER", "openai") + monkeypatch.setenv("MEM0_OSS_LLM_MODEL", "gpt-4o-mini") + monkeypatch.setenv("MEM0_OSS_USER_ID", "stan") + monkeypatch.setenv("MEM0_OSS_TOP_K", "5") + monkeypatch.setenv("MEM0_OSS_EMBEDDER_DIMS", "1536") + # Clear provider env vars so auto-detect doesn't interfere with provider assertion + for k in ("OPENROUTER_API_KEY", "ANTHROPIC_API_KEY", "OPENAI_API_KEY", + "AWS_ACCESS_KEY_ID", "AWS_PROFILE"): + monkeypatch.delenv(k, raising=False) + cfg = _load_config() + assert cfg["llm_provider"] == "openai" + assert cfg["llm_model"] == "gpt-4o-mini" + assert cfg["user_id"] == "stan" + assert cfg["top_k"] == 5 + assert cfg["embedder_dims"] == 1536 + + def test_json_file_overrides_env(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("MEM0_OSS_COLLECTION", "env-collection") + cfg_file = tmp_path / "mem0_oss.json" + cfg_file.write_text(json.dumps({"collection": "file-collection"})) + cfg = _load_config() + assert cfg["collection"] == "file-collection" + + def test_json_file_skips_null_values(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("MEM0_OSS_USER_ID", "env-user") + cfg_file = tmp_path / "mem0_oss.json" + cfg_file.write_text(json.dumps({"user_id": None})) + cfg = _load_config() + assert cfg["user_id"] == "env-user" + + def test_malformed_json_falls_back_to_env(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("MEM0_OSS_USER_ID", "env-user") + cfg_file = tmp_path / "mem0_oss.json" + cfg_file.write_text("{not valid json") + cfg = _load_config() + assert cfg["user_id"] == "env-user" + + def test_paths_scoped_to_hermes_home(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + cfg = _load_config() + assert cfg["vector_store_path"].startswith(str(tmp_path)) + assert cfg["history_db_path"].startswith(str(tmp_path)) + + def test_aux_config_provider_used(self, tmp_path, monkeypatch): + """auxiliary.mem0_oss.provider is respected.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda: {"provider": "openai", "model": "gpt-4o", "api_key": ""}) + cfg = _load_config() + assert cfg["llm_provider"] == "openai" + assert cfg["llm_model"] == "gpt-4o" + + def test_env_provider_overrides_aux_config(self, tmp_path, monkeypatch): + """MEM0_OSS_LLM_PROVIDER env var beats auxiliary.mem0_oss.provider.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda: {"provider": "anthropic", "model": ""}) + monkeypatch.setenv("MEM0_OSS_LLM_PROVIDER", "ollama") + cfg = _load_config() + assert cfg["llm_provider"] == "ollama" + + def test_aux_api_key_used_when_no_env_key(self, tmp_path, monkeypatch): + """auxiliary.mem0_oss.api_key propagates into cfg['api_key'].""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda: {"provider": "openai", "api_key": "aux-key-123"}) + monkeypatch.delenv("MEM0_OSS_API_KEY", raising=False) + cfg = _load_config() + assert cfg["api_key"] == "aux-key-123" + + def test_mem0_oss_api_key_beats_aux_key(self, tmp_path, monkeypatch): + """MEM0_OSS_API_KEY env var takes precedence over aux config api_key.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda: {"provider": "openai", "api_key": "aux-key"}) + monkeypatch.setenv("MEM0_OSS_API_KEY", "env-key") + cfg = _load_config() + assert cfg["api_key"] == "env-key" + + def test_hermes_provider_aliases_normalised(self, tmp_path, monkeypatch): + """Hermes aliases 'bedrock' and 'openrouter' map to mem0 provider keys.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + for hermes_alias, expected_mem0 in [("bedrock", "aws_bedrock"), + ("openrouter", "openai"), + ("aws", "aws_bedrock")]: + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda p=hermes_alias: {"provider": p}) + cfg = _load_config() + assert cfg["llm_provider"] == expected_mem0, f"{hermes_alias} → {expected_mem0}" + + def test_aux_base_url_used(self, tmp_path, monkeypatch): + """auxiliary.mem0_oss.base_url populates cfg['base_url'].""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", + lambda: {"provider": "openai", "base_url": "http://myhost/v1"}) + monkeypatch.delenv("MEM0_OSS_OPENAI_BASE_URL", raising=False) + cfg = _load_config() + assert cfg["base_url"] == "http://myhost/v1" + + def test_auxiliary_default_inherited_when_no_mem0_oss_key(self, tmp_path, monkeypatch): + """auxiliary.default is used when auxiliary.mem0_oss is not set.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + fake_config = { + "auxiliary": { + "default": { + "provider": "openai", + "model": "gpt-4o-mini", + "api_key": "default-key", + } + } + } + with patch("hermes_cli.config.load_config", return_value=fake_config): + from plugins.memory.mem0_oss import _get_aux_config + result = _get_aux_config() + assert result.get("provider") == "openai" + assert result.get("model") == "gpt-4o-mini" + assert result.get("api_key") == "default-key" + + def test_auxiliary_mem0_oss_key_overrides_default(self, tmp_path, monkeypatch): + """auxiliary.mem0_oss-specific keys win over auxiliary.default.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + fake_config = { + "auxiliary": { + "default": { + "provider": "openai", + "model": "gpt-4o-mini", + }, + "mem0_oss": { + "provider": "anthropic", + "model": "claude-haiku-4-5", + }, + } + } + with patch("hermes_cli.config.load_config", return_value=fake_config): + from plugins.memory.mem0_oss import _get_aux_config + result = _get_aux_config() + # mem0_oss-specific values win + assert result.get("provider") == "anthropic" + assert result.get("model") == "claude-haiku-4-5" + + def test_auxiliary_default_model_passed_to_load_config(self, tmp_path, monkeypatch): + """When auxiliary.default sets a model it ends up in llm_model.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + fake_config = { + "auxiliary": { + "default": { + "provider": "openai", + "model": "gpt-4o-mini", + "api_key": "sk-default", + } + } + } + with patch("hermes_cli.config.load_config", return_value=fake_config): + cfg = _load_config() + assert cfg["llm_model"] == "gpt-4o-mini" + assert cfg["llm_provider"] == "openai" + + +# --------------------------------------------------------------------------- +# _resolve_auto_credentials +# --------------------------------------------------------------------------- + + +from plugins.memory.mem0_oss import _resolve_auto_credentials + + +class TestResolveAutoCredentials: + def test_noop_when_explicit_provider_set(self, monkeypatch): + """Returns inputs unchanged when aux_provider is already set.""" + result = _resolve_auto_credentials("openai", "gpt-4o", "", "mykey") + assert result == ("openai", "gpt-4o", "", "mykey") + + def test_noop_when_provider_is_nonempty_non_auto(self, monkeypatch): + result = _resolve_auto_credentials("anthropic", "", "", "") + assert result[0] == "anthropic" + + def test_auto_picks_openrouter_key(self, monkeypatch): + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + # Directly test the real function with no aux provider + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "") + assert provider == "openrouter" + assert api_key == "sk-or-test" + assert "openrouter.ai" in base_url + + def test_auto_picks_anthropic_when_no_openrouter(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "") + assert provider == "anthropic" + assert api_key == "sk-ant-test" + + def test_auto_picks_openai_when_no_openrouter_or_anthropic(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.setenv("OPENAI_API_KEY", "sk-openai-test") + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "") + assert provider == "openai" + assert api_key == "sk-openai-test" + + def test_auto_picks_bedrock_when_aws_key_set(self, monkeypatch): + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "AKID") + monkeypatch.delenv("AWS_PROFILE", raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "") + assert provider == "aws_bedrock" + + def test_auto_returns_auto_when_nothing_configured(self, monkeypatch): + for k in ("OPENROUTER_API_KEY", "ANTHROPIC_API_KEY", "OPENAI_API_KEY", + "AWS_ACCESS_KEY_ID", "AWS_PROFILE"): + monkeypatch.delenv(k, raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "") + assert provider == "auto" + + def test_auto_is_also_treated_as_unset(self, monkeypatch): + """aux_provider='auto' triggers auto-detect, same as empty string.""" + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-auto") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("auto", "", "", "") + assert provider == "openrouter" + assert api_key == "sk-or-auto" + + def test_explicit_aux_api_key_not_overwritten(self, monkeypatch): + """Existing aux_api_key is never replaced by auto-detected key.""" + monkeypatch.setenv("OPENROUTER_API_KEY", "should-not-use") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + provider, model, base_url, api_key = _resolve_auto_credentials("", "", "", "my-existing-key") + # Provider resolved to openrouter, but key should be the existing one + assert api_key == "my-existing-key" + + +class TestLoadConfigAutoFallthrough: + """Integration: _load_config picks up provider key via auto-detect.""" + + def test_openrouter_key_propagates_when_no_aux_config(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-load-test") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + cfg = _load_config() + assert cfg["api_key"] == "sk-or-load-test" + assert cfg["llm_provider"] == "openai" # openrouter normalises to openai + + def test_anthropic_key_propagates_when_no_aux_config(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-load-test") + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + cfg = _load_config() + assert cfg["api_key"] == "sk-ant-load-test" + assert cfg["llm_provider"] == "anthropic" + + def test_explicit_mem0_oss_api_key_beats_auto(self, tmp_path, monkeypatch): + """MEM0_OSS_API_KEY always wins over auto-detected key.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setattr("plugins.memory.mem0_oss._get_aux_config", lambda: {}) + monkeypatch.setenv("OPENROUTER_API_KEY", "or-key") + monkeypatch.setenv("MEM0_OSS_API_KEY", "explicit-wins") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + cfg = _load_config() + assert cfg["api_key"] == "explicit-wins" + + +# --------------------------------------------------------------------------- +# _build_mem0_config +# --------------------------------------------------------------------------- + + +class TestBuildMem0Config: + def _base_cfg(self, **overrides): + base = { + "vector_store_path": "/tmp/qdrant", + "history_db_path": "/tmp/history.db", + "collection": "hermes", + "user_id": "hermes-user", + "llm_provider": "aws_bedrock", + "llm_model": "some-model", + "embedder_provider": "aws_bedrock", + "embedder_model": "some-embed", + "embedder_dims": 1024, + "top_k": 10, + "api_key": "", + "base_url": "", + "openai_api_key": "", + "openai_base_url": "", + } + base.update(overrides) + return base + + def test_vector_store_is_qdrant_local(self): + out = _build_mem0_config(self._base_cfg()) + assert out["vector_store"]["provider"] == "qdrant" + assert out["vector_store"]["config"]["path"] == "/tmp/qdrant" + assert out["vector_store"]["config"]["on_disk"] is True + + def test_collection_name_passed(self): + out = _build_mem0_config(self._base_cfg(collection="my-col")) + assert out["vector_store"]["config"]["collection_name"] == "my-col" + + def test_embedding_dims_wired_to_qdrant(self): + out = _build_mem0_config(self._base_cfg(embedder_dims=768)) + assert out["vector_store"]["config"]["embedding_model_dims"] == 768 + + def test_openai_api_key_injected_when_provider_is_openai(self): + out = _build_mem0_config(self._base_cfg( + llm_provider="openai", + openai_api_key="sk-xxx", + )) + assert out["llm"]["config"]["api_key"] == "sk-xxx" + + def test_openai_api_key_not_injected_for_bedrock(self): + out = _build_mem0_config(self._base_cfg(openai_api_key="sk-xxx")) + assert "api_key" not in out["llm"]["config"] + + def test_openai_base_url_injected_when_set(self): + out = _build_mem0_config(self._base_cfg( + llm_provider="openai", + openai_api_key="k", + openai_base_url="http://localhost:1234/v1", + )) + assert out["llm"]["config"]["openai_base_url"] == "http://localhost:1234/v1" + + def test_history_db_path_passed(self): + out = _build_mem0_config(self._base_cfg(history_db_path="/tmp/h.db")) + assert out["history_db_path"] == "/tmp/h.db" + + def test_version_is_v1_1(self): + out = _build_mem0_config(self._base_cfg()) + assert out["version"] == "v1.1" + + +# --------------------------------------------------------------------------- +# is_available +# --------------------------------------------------------------------------- + + +class TestAvailability: + def test_available_with_bedrock_key(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "AKID") + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "aws_bedrock", "openai_api_key": "", + }): + p = Mem0OSSMemoryProvider() + assert p.is_available() + + def test_not_available_without_mem0ai(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "AKID") + import sys + mem0_backup = sys.modules.get("mem0") + sys.modules["mem0"] = None # type: ignore[assignment] + try: + p = Mem0OSSMemoryProvider() + assert not p.is_available() + finally: + if mem0_backup is None: + sys.modules.pop("mem0", None) + else: + sys.modules["mem0"] = mem0_backup + + def test_not_available_without_aws_key(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "aws_bedrock", "openai_api_key": "", + }): + with patch("agent.bedrock_adapter.has_aws_credentials", return_value=False): + p = Mem0OSSMemoryProvider() + assert not p.is_available() + + def test_available_via_has_aws_credentials_fallback(self, monkeypatch, tmp_path): + """is_available returns True when has_aws_credentials() is True even without env vars.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_PROFILE", raising=False) + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "aws_bedrock", "openai_api_key": "", + }): + with patch("agent.bedrock_adapter.has_aws_credentials", return_value=True): + p = Mem0OSSMemoryProvider() + assert p.is_available() + + def test_available_with_openai_key_in_config(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "openai", "openai_api_key": "sk-test", + }): + p = Mem0OSSMemoryProvider() + assert p.is_available() + + def test_not_available_openai_without_key(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "openai", "openai_api_key": "", + }): + p = Mem0OSSMemoryProvider() + assert not p.is_available() + + def test_ollama_always_available(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + with patch("plugins.memory.mem0_oss._load_config", return_value={ + "llm_provider": "ollama", "openai_api_key": "", + }): + p = Mem0OSSMemoryProvider() + assert p.is_available() + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + + +class TestSchemas: + def test_search_schema_has_query(self): + assert SEARCH_SCHEMA["name"] == "mem0_oss_search" + assert "query" in SEARCH_SCHEMA["parameters"]["properties"] + assert "query" in SEARCH_SCHEMA["parameters"]["required"] + + def test_add_schema_has_content(self): + assert ADD_SCHEMA["name"] == "mem0_oss_add" + assert "content" in ADD_SCHEMA["parameters"]["properties"] + assert "content" in ADD_SCHEMA["parameters"]["required"] + + def test_get_tool_schemas_returns_both(self, provider): + schemas = provider.get_tool_schemas() + assert len(schemas) == 1 + names = {s["name"] for s in schemas} + assert names == {"mem0_oss_search"} + + +# --------------------------------------------------------------------------- +# Tool handlers +# --------------------------------------------------------------------------- + + +class TestToolHandlers: + def test_search_success(self, provider): + result = json.loads(provider.handle_tool_call("mem0_oss_search", {"query": "dark mode"})) + assert "dark mode" in result["result"] or "hermes-agent" in result["result"] + provider._mock_mem.search.assert_called_once() + kwargs = provider._mock_mem.search.call_args.kwargs + assert kwargs["query"] == "dark mode" + assert kwargs["top_k"] == 10 + assert kwargs["filters"] == {"user_id": "test-user"} + + def test_search_respects_top_k_arg(self, provider): + provider.handle_tool_call("mem0_oss_search", {"query": "q", "top_k": 3}) + kwargs = provider._mock_mem.search.call_args.kwargs + assert kwargs["top_k"] == 3 + + def test_search_caps_top_k_at_50(self, provider): + provider.handle_tool_call("mem0_oss_search", {"query": "q", "top_k": 999}) + kwargs = provider._mock_mem.search.call_args.kwargs + assert kwargs["top_k"] == 50 + + def test_search_no_results(self, provider): + provider._mock_mem.search.return_value = [] + result = json.loads(provider.handle_tool_call("mem0_oss_search", {"query": "q"})) + assert result["result"] == "No relevant memories found." + + def test_search_missing_query(self, provider): + result = json.loads(provider.handle_tool_call("mem0_oss_search", {})) + assert "error" in result + + def test_search_error_handling(self, provider): + provider._mock_mem.search.side_effect = RuntimeError("connection refused") + result = json.loads(provider.handle_tool_call("mem0_oss_search", {"query": "q"})) + assert "error" in result + assert "connection refused" in result["error"] + + def test_search_v2_dict_response(self, provider): + provider._mock_mem.search.return_value = { + "results": [{"memory": "A"}, {"memory": "B"}] + } + result = json.loads(provider.handle_tool_call("mem0_oss_search", {"query": "q"})) + assert "A" in result["result"] + assert "B" in result["result"] + + def test_add_success(self, provider): + result = json.loads(provider.handle_tool_call("mem0_oss_add", {"content": "User is Stan"})) + assert result["result"] == "Memory stored successfully." + provider._mock_mem.add.assert_called_once() + kwargs = provider._mock_mem.add.call_args.kwargs + assert kwargs["user_id"] == "test-user" + assert kwargs["infer"] is True + msgs = kwargs["messages"] + assert msgs[0]["role"] == "user" + assert msgs[0]["content"] == "User is Stan" + + def test_add_missing_content(self, provider): + result = json.loads(provider.handle_tool_call("mem0_oss_add", {})) + assert "error" in result + + def test_add_error_handling(self, provider): + provider._mock_mem.add.side_effect = RuntimeError("LLM timeout") + result = json.loads(provider.handle_tool_call("mem0_oss_add", {"content": "test"})) + assert "error" in result + assert "LLM timeout" in result["error"] + + def test_unknown_tool(self, provider): + result = json.loads(provider.handle_tool_call("mem0_oss_unknown", {})) + assert "error" in result + + def test_search_lock_contention_returns_graceful_message(self, provider): + """Qdrant lock error produces a non-fatal result, not an error dict.""" + provider._mock_mem.search.side_effect = RuntimeError( + "Storage folder already accessed by another instance of Qdrant client" + ) + result = json.loads(provider.handle_tool_call("mem0_oss_search", {"query": "q"})) + # Should be a graceful result, not a tool_error + assert "result" in result + assert "locked" in result["result"].lower() or "unavailable" in result["result"].lower() + + def test_add_lock_contention_returns_graceful_message(self, provider): + """Qdrant lock error on add produces a non-fatal result.""" + provider._mock_mem.add.side_effect = RuntimeError( + "Storage folder already accessed by another instance of Qdrant client" + ) + result = json.loads(provider.handle_tool_call("mem0_oss_add", {"content": "test"})) + assert "result" in result + assert "locked" in result["result"].lower() or "unavailable" in result["result"].lower() + + +# --------------------------------------------------------------------------- +# Circuit breaker +# --------------------------------------------------------------------------- + + +class TestCircuitBreaker: + def test_breaker_trips_after_threshold(self, provider): + from plugins.memory.mem0_oss import _BREAKER_THRESHOLD + provider._mock_mem.search.side_effect = RuntimeError("fail") + for _ in range(_BREAKER_THRESHOLD): + provider.handle_tool_call("mem0_oss_search", {"query": "q"}) + assert provider._is_tripped() + + def test_breaker_resets_after_cooldown(self, provider): + from plugins.memory.mem0_oss import _BREAKER_THRESHOLD + provider._mock_mem.search.side_effect = RuntimeError("fail") + for _ in range(_BREAKER_THRESHOLD): + provider.handle_tool_call("mem0_oss_search", {"query": "q"}) + + provider._last_fail_ts = time.monotonic() - 999 # simulate expired cooldown + assert not provider._is_tripped() + + def test_breaker_clears_on_success(self, provider): + from plugins.memory.mem0_oss import _BREAKER_THRESHOLD + provider._mock_mem.search.side_effect = RuntimeError("fail") + for _ in range(_BREAKER_THRESHOLD - 1): + provider.handle_tool_call("mem0_oss_search", {"query": "q"}) + + provider._mock_mem.search.side_effect = None + provider._mock_mem.search.return_value = [{"memory": "ok"}] + provider.handle_tool_call("mem0_oss_search", {"query": "q"}) + assert provider._fail_count == 0 + + def test_tripped_breaker_skips_prefetch(self, provider): + from plugins.memory.mem0_oss import _BREAKER_THRESHOLD + with provider._lock: + provider._fail_count = _BREAKER_THRESHOLD + provider._last_fail_ts = time.monotonic() + provider.queue_prefetch("anything") + assert provider._prefetch_thread is None + + +# --------------------------------------------------------------------------- +# Prefetch +# --------------------------------------------------------------------------- + + +class TestPrefetch: + def test_prefetch_returns_formatted_result(self, provider): + provider.queue_prefetch("dark mode preferences") + if provider._prefetch_thread: + provider._prefetch_thread.join(timeout=5.0) + result = provider.prefetch("dark mode preferences") + assert "Mem0 OSS Memory" in result + assert "dark mode" in result or "hermes-agent" in result + + def test_prefetch_empty_on_no_results(self, provider): + provider._mock_mem.search.return_value = [] + provider.queue_prefetch("nothing here") + if provider._prefetch_thread: + provider._prefetch_thread.join(timeout=5.0) + assert provider.prefetch("nothing here") == "" + + def test_prefetch_truncates_long_query(self, provider): + long_query = "x" * 1000 + provider.queue_prefetch(long_query) + if provider._prefetch_thread: + provider._prefetch_thread.join(timeout=5.0) + call_args = provider._mock_mem.search.call_args + assert len(call_args.kwargs["query"]) <= 500 + + def test_prefetch_joins_thread_on_return(self, provider): + provider.queue_prefetch("test") + assert provider._prefetch_thread is not None + result = provider.prefetch("test") # should join internally + assert provider._prefetch_thread is None + + +# --------------------------------------------------------------------------- +# sync_turn +# --------------------------------------------------------------------------- + + +class TestSyncTurn: + def test_sync_turn_calls_add(self, provider): + provider.sync_turn("hello", "hi there") + if provider._sync_thread: + provider._sync_thread.join(timeout=5.0) + provider._mock_mem.add.assert_called_once() + kwargs = provider._mock_mem.add.call_args.kwargs + assert kwargs["user_id"] == "test-user" + assert kwargs["infer"] is True + msgs = kwargs["messages"] + assert msgs[0]["role"] == "user" + assert msgs[0]["content"] == "hello" + assert msgs[1]["role"] == "assistant" + assert msgs[1]["content"] == "hi there" + + def test_sync_turn_skipped_for_subagent(self, provider): + provider._agent_context = "subagent" + provider.sync_turn("hello", "hi") + assert provider._sync_thread is None + + def test_sync_turn_error_does_not_raise(self, provider): + provider._mock_mem.add.side_effect = RuntimeError("network down") + provider.sync_turn("hello", "hi") + if provider._sync_thread: + provider._sync_thread.join(timeout=5.0) + # No exception propagated + + def test_sync_turn_skipped_when_breaker_tripped(self, provider): + from plugins.memory.mem0_oss import _BREAKER_THRESHOLD + with provider._lock: + provider._fail_count = _BREAKER_THRESHOLD + provider._last_fail_ts = time.monotonic() + provider.sync_turn("hello", "hi") + assert provider._sync_thread is None + + +# --------------------------------------------------------------------------- +# System prompt block +# --------------------------------------------------------------------------- + + +class TestSystemPromptBlock: + def test_mentions_tool_names(self, provider): + block = provider.system_prompt_block() + assert "mem0_oss_search" in block + + def test_mentions_long_term_memory(self, provider): + block = provider.system_prompt_block() + assert "memory" in block.lower() + + +# --------------------------------------------------------------------------- +# Provider name +# --------------------------------------------------------------------------- + + +class TestProviderName: + def test_name(self, monkeypatch, tmp_path): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + assert Mem0OSSMemoryProvider().name == "mem0_oss" + + +# --------------------------------------------------------------------------- +# Config schema +# --------------------------------------------------------------------------- + + +class TestConfigSchema: + def test_schema_has_expected_keys(self, provider): + schema = provider.get_config_schema() + keys = {f["key"] for f in schema} + expected = { + "llm_provider", "llm_model", "embedder_provider", "embedder_model", + "embedder_dims", "collection", "user_id", "top_k", + # New dedicated key (preferred) + legacy alias still present + "api_key", "openai_api_key", + # base_url (replaces openai_base_url in name) + "base_url", + } + assert expected.issubset(keys) + + def test_api_key_is_secret(self, provider): + schema = provider.get_config_schema() + key_entry = next(f for f in schema if f["key"] == "api_key") + assert key_entry.get("secret") is True + + def test_openai_api_key_is_secret(self, provider): + schema = provider.get_config_schema() + key_entry = next(f for f in schema if f["key"] == "openai_api_key") + assert key_entry.get("secret") is True + + +# --------------------------------------------------------------------------- +# initialize +# --------------------------------------------------------------------------- + + +class TestInitialize: + def test_creates_storage_directories(self, tmp_path, monkeypatch): + with patch("plugins.memory.mem0_oss._load_config") as mock_cfg: + mock_cfg.return_value = { + "vector_store_path": str(tmp_path / "oss" / "qdrant"), + "history_db_path": str(tmp_path / "oss" / "history.db"), + "collection": "hermes", + "user_id": "u", + "llm_provider": "aws_bedrock", + "llm_model": "m", + "embedder_provider": "aws_bedrock", + "embedder_model": "e", + "embedder_dims": 1024, + "top_k": 10, + "openai_api_key": "", + "openai_base_url": "", + } + p = Mem0OSSMemoryProvider() + p.initialize("sess") + assert (tmp_path / "oss" / "qdrant").is_dir() + assert (tmp_path / "oss").is_dir() # parent of history.db + + def test_primary_context_allows_sync(self, provider): + assert provider._agent_context == "primary" + + def test_default_top_k(self, provider): + assert provider._top_k == 10 + + +# --------------------------------------------------------------------------- +# Shutdown +# --------------------------------------------------------------------------- + + +class TestOnMemoryWrite: + """on_memory_write mirrors builtin memory tool calls into mem0.""" + + def test_add_action_triggers_mem0_add(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + p.initialize("test-session") + p._user_id = "hermes-user" + + mock_mem = MagicMock() + p._get_memory = lambda: mock_mem + + p.on_memory_write("add", "user", "Stan likes light mode") + # Give the background thread a moment + import time; time.sleep(0.2) + + mock_mem.add.assert_called_once() + kwargs = mock_mem.add.call_args.kwargs + assert kwargs["user_id"] == "hermes-user" + assert kwargs["infer"] is False + assert kwargs["metadata"] == {"source": "hermes_memory_tool", "target": "user"} + msgs = kwargs["messages"] + assert isinstance(msgs, list) + assert msgs[0]["role"] == "user" + assert msgs[0]["content"] == "Stan likes light mode" + + def test_non_add_action_ignored(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + p.initialize("test-session") + mock_mem = MagicMock() + p._get_memory = lambda: mock_mem + + p.on_memory_write("replace", "user", "something") + p.on_memory_write("remove", "memory", "something") + import time; time.sleep(0.1) + + mock_mem.add.assert_not_called() + + def test_empty_content_ignored(self, tmp_path, monkeypatch): + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + p.initialize("test-session") + mock_mem = MagicMock() + p._get_memory = lambda: mock_mem + + p.on_memory_write("add", "user", "") + p.on_memory_write("add", "user", " ") + import time; time.sleep(0.1) + + mock_mem.add.assert_not_called() + + +class TestShutdown: + def test_shutdown_joins_sync_thread(self, provider): + finished = threading.Event() + + def _slow_sync(): + time.sleep(0.1) + finished.set() + + t = threading.Thread(target=_slow_sync, daemon=True) + t.start() + provider._sync_thread = t + provider.shutdown() + assert finished.is_set() + + def test_shutdown_no_error_when_no_threads(self, provider): + provider._sync_thread = None + provider._prefetch_thread = None + provider.shutdown() # should not raise + + +# --------------------------------------------------------------------------- +# save_config +# --------------------------------------------------------------------------- + + +class TestSaveConfig: + def test_creates_json_file(self, tmp_path, monkeypatch): + """save_config writes values to mem0_oss.json.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + p.save_config({"llm_provider": "openai", "user_id": "stan"}, tmp_path) + cfg_path = tmp_path / "mem0_oss.json" + assert cfg_path.exists() + saved = json.loads(cfg_path.read_text()) + assert saved["llm_provider"] == "openai" + assert saved["user_id"] == "stan" + + def test_merges_with_existing_file(self, tmp_path, monkeypatch): + """save_config merges into existing file, preserving untouched keys.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + cfg_path = tmp_path / "mem0_oss.json" + cfg_path.write_text(json.dumps({"collection": "old", "top_k": 5})) + p.save_config({"collection": "new"}, tmp_path) + saved = json.loads(cfg_path.read_text()) + assert saved["collection"] == "new" + assert saved["top_k"] == 5 # untouched + + def test_overwrites_existing_key(self, tmp_path, monkeypatch): + """save_config overwrites an existing key.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + cfg_path = tmp_path / "mem0_oss.json" + cfg_path.write_text(json.dumps({"user_id": "old-user"})) + p.save_config({"user_id": "new-user"}, tmp_path) + saved = json.loads(cfg_path.read_text()) + assert saved["user_id"] == "new-user" + + def test_graceful_if_existing_file_malformed(self, tmp_path, monkeypatch): + """save_config ignores malformed JSON and writes fresh.""" + monkeypatch.setattr("plugins.memory.mem0_oss.get_hermes_home", lambda: tmp_path) + p = Mem0OSSMemoryProvider() + cfg_path = tmp_path / "mem0_oss.json" + cfg_path.write_text("{not valid json") + p.save_config({"llm_provider": "openai"}, tmp_path) + saved = json.loads(cfg_path.read_text()) + assert saved["llm_provider"] == "openai" From 2791eb31d47adcaeb9d2b05257b6aab2dacaf7f7 Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:53:40 +0200 Subject: [PATCH 4/9] refactor(mem0_oss): update plugins/memory/mem0_oss/README.md --- plugins/memory/mem0_oss/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/memory/mem0_oss/README.md b/plugins/memory/mem0_oss/README.md index caf12e0e4..8c10bdb6a 100644 --- a/plugins/memory/mem0_oss/README.md +++ b/plugins/memory/mem0_oss/README.md @@ -127,13 +127,14 @@ by the setup wizard (via `save_config`), or written manually — see | Tool | Description | |------|-------------| | `mem0_oss_search` | Semantic search over stored memories | +| `mem0_oss_add` | Store a fact, preference, or context explicitly | Facts are extracted and stored automatically on every conversation turn via `sync_turn` — no explicit save call needed. Writes via the built-in `memory` tool are also mirrored automatically into mem0 via `on_memory_write`. To explicitly save something mid-session, use -the built-in `memory` tool; it will propagate to mem0 automatically. +`mem0_oss_add` (or the built-in `memory` tool — both propagate to mem0). ## Concurrent access (WebUI + gateway) From 437047af173865f4a16145f7e2d6023c66378d84 Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:53:41 +0200 Subject: [PATCH 5/9] refactor(mem0_oss): update plugins/memory/mem0_oss/__init__.py --- plugins/memory/mem0_oss/__init__.py | 40 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/plugins/memory/mem0_oss/__init__.py b/plugins/memory/mem0_oss/__init__.py index 24ed00d96..3025245b9 100644 --- a/plugins/memory/mem0_oss/__init__.py +++ b/plugins/memory/mem0_oss/__init__.py @@ -500,6 +500,22 @@ class Mem0OSSMemoryProvider(MemoryProvider): (or OpenAI / Ollama) for LLM fact-extraction and embedding. """ + def __init__(self): + # Config / identity + self._cfg: dict = {} + self._user_id: str = "hermes-user" + self._top_k: int = 10 + self._session_id: str = "" + self._agent_context: str = "primary" + # Circuit-breaker state (lock-protected) + self._lock = threading.Lock() + self._fail_count: int = 0 + self._last_fail_ts: float = 0.0 + # Background thread state + self._sync_thread: Optional[threading.Thread] = None + self._prefetch_thread: Optional[threading.Thread] = None + self._prefetch_result: str = "" + # -- MemoryProvider identity -------------------------------------------- @property @@ -554,18 +570,12 @@ class Mem0OSSMemoryProvider(MemoryProvider): self._cfg = _load_config() self._user_id = self._cfg["user_id"] self._top_k = self._cfg["top_k"] - - # Circuit-breaker state - self._fail_count = 0 - self._last_fail_ts = 0.0 - self._lock = threading.Lock() - - # Background sync state - self._sync_thread: Optional[threading.Thread] = None - - # Prefetch state (background thread fills this before each turn) - self._prefetch_result: str = "" - self._prefetch_thread: Optional[threading.Thread] = None + # Reset circuit-breaker and prefetch state for this session. + # (Lock is created in __init__ and reused across sessions.) + with self._lock: + self._fail_count = 0 + self._last_fail_ts = 0.0 + self._prefetch_result = "" import pathlib pathlib.Path(self._cfg["vector_store_path"]).mkdir(parents=True, exist_ok=True) pathlib.Path(self._cfg["history_db_path"]).parent.mkdir(parents=True, exist_ok=True) @@ -628,8 +638,8 @@ class Mem0OSSMemoryProvider(MemoryProvider): "## Mem0 OSS Memory (self-hosted)\n" "You have access to long-term memory stored locally via mem0.\n" "- Use `mem0_oss_search` to recall relevant facts before answering.\n" - "- Facts are extracted and deduplicated automatically on each turn via sync_turn.\n" - "- To explicitly save something, use the built-in `memory` tool — it mirrors to mem0 automatically.\n" + "- Use `mem0_oss_add` to store important new facts, preferences, or context.\n" + "- Facts are extracted and deduplicated automatically on each turn.\n" "- Search is semantic — natural-language queries work well.\n" ) @@ -714,7 +724,7 @@ class Mem0OSSMemoryProvider(MemoryProvider): # -- Tool schemas & dispatch ------------------------------------------- def get_tool_schemas(self) -> List[dict]: - return [SEARCH_SCHEMA] + return [SEARCH_SCHEMA, ADD_SCHEMA] def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: if tool_name == "mem0_oss_search": From c4b925081f17b1d8907f7f7444c805a0516b13a0 Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:53:42 +0200 Subject: [PATCH 6/9] refactor(mem0_oss): update plugins/memory/mem0_oss/plugin.yaml --- plugins/memory/mem0_oss/plugin.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 plugins/memory/mem0_oss/plugin.yaml diff --git a/plugins/memory/mem0_oss/plugin.yaml b/plugins/memory/mem0_oss/plugin.yaml new file mode 100644 index 000000000..7bdee95ca --- /dev/null +++ b/plugins/memory/mem0_oss/plugin.yaml @@ -0,0 +1,6 @@ +name: mem0_oss +version: 1.0.0 +description: "Mem0 OSS — self-hosted LLM fact extraction with local Qdrant vector store. No cloud account required." +pip_dependencies: + - mem0ai + - qdrant-client From c6ee9ffcb1bac647ab527b28e032e33de7746f4f Mon Sep 17 00:00:00 2001 From: bsgdigital Date: Fri, 24 Apr 2026 11:53:43 +0200 Subject: [PATCH 7/9] refactor(mem0_oss): update tests/plugins/memory/test_mem0_oss_provider.py --- .../plugins/memory/test_mem0_oss_provider.py | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/plugins/memory/test_mem0_oss_provider.py b/tests/plugins/memory/test_mem0_oss_provider.py index 66c3d17ce..11473def5 100644 --- a/tests/plugins/memory/test_mem0_oss_provider.py +++ b/tests/plugins/memory/test_mem0_oss_provider.py @@ -610,9 +610,9 @@ class TestSchemas: def test_get_tool_schemas_returns_both(self, provider): schemas = provider.get_tool_schemas() - assert len(schemas) == 1 + assert len(schemas) == 2 names = {s["name"] for s in schemas} - assert names == {"mem0_oss_search"} + assert names == {"mem0_oss_search", "mem0_oss_add"} # --------------------------------------------------------------------------- @@ -836,6 +836,7 @@ class TestSystemPromptBlock: def test_mentions_tool_names(self, provider): block = provider.system_prompt_block() assert "mem0_oss_search" in block + assert "mem0_oss_add" in block def test_mentions_long_term_memory(self, provider): block = provider.system_prompt_block() @@ -883,6 +884,39 @@ class TestConfigSchema: assert key_entry.get("secret") is True +# --------------------------------------------------------------------------- +# Pre-initialization safety (__init__ defaults) +# --------------------------------------------------------------------------- + + +class TestPreInit: + """Provider must be safe to inspect before initialize() is called.""" + + def test_name_before_init(self): + p = Mem0OSSMemoryProvider() + assert p.name == "mem0_oss" + + def test_lock_exists_before_init(self): + """_lock must exist from __init__ so circuit-breaker methods don't crash.""" + p = Mem0OSSMemoryProvider() + assert hasattr(p, "_lock") + import threading + assert isinstance(p._lock, type(threading.Lock())) + + def test_fail_count_zero_before_init(self): + p = Mem0OSSMemoryProvider() + assert p._fail_count == 0 + + def test_threads_none_before_init(self): + p = Mem0OSSMemoryProvider() + assert p._sync_thread is None + assert p._prefetch_thread is None + + def test_prefetch_result_empty_before_init(self): + p = Mem0OSSMemoryProvider() + assert p._prefetch_result == "" + + # --------------------------------------------------------------------------- # initialize # --------------------------------------------------------------------------- From 994c6f9e9867291d04ab4e84015e59af2ae0b17c Mon Sep 17 00:00:00 2001 From: Hermes Local Date: Fri, 24 Apr 2026 10:39:04 +0000 Subject: [PATCH 8/9] fix(mem0_oss): retry Qdrant lock contention in _get_memory with backoff Add _LOCK_RETRY_ATTEMPTS (10) and _LOCK_RETRY_DELAY_S (0.8s) constants. Retry loop in _get_memory() retries on Qdrant portalocker errors with jitter, non-lock errors still fail fast. Add explicit del mem after each operation to release the lock ASAP. Fix record_failure ordering in search and add so lock errors are tracked correctly. --- plugins/memory/mem0_oss/__init__.py | 81 ++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 25 deletions(-) diff --git a/plugins/memory/mem0_oss/__init__.py b/plugins/memory/mem0_oss/__init__.py index 3025245b9..6d998975d 100644 --- a/plugins/memory/mem0_oss/__init__.py +++ b/plugins/memory/mem0_oss/__init__.py @@ -75,6 +75,13 @@ _BREAKER_COOLDOWN_SECS = 120 # Qdrant embedded lock error substring — used to detect contention gracefully. _QDRANT_LOCK_ERROR = "already accessed by another instance" +# Retry parameters for Qdrant lock contention in _get_memory(). +# Two processes (WebUI + gateway) may briefly overlap; retry resolves it. +# Prefetch + sync operations hold the lock during an LLM call (~1-3s), +# so we retry for up to 15s total with jitter to avoid thundering herd. +_LOCK_RETRY_ATTEMPTS = 10 # total attempts +_LOCK_RETRY_DELAY_S = 0.8 # base seconds between retries (with jitter, up to ~0.4s extra) + # --------------------------------------------------------------------------- # Config helpers @@ -584,32 +591,50 @@ class Mem0OSSMemoryProvider(MemoryProvider): """Create a fresh mem0 Memory instance for each call. We intentionally do NOT cache the instance. The embedded Qdrant store - uses a file lock that is held for the lifetime of the client object. - When both the WebUI and the gateway run in the same host they would - otherwise compete for the lock — whichever process cached it first - would block all calls in the other process. + uses a portalocker (fcntl) exclusive lock that is held for the lifetime + of the client object. When both the WebUI and the gateway run on the + same host they compete for this lock. - By creating a new instance per call and letting it go out of scope - afterwards, the lock is acquired and released on each operation so - both processes can coexist. The overhead is acceptable: Qdrant - initialisation is fast once the collection already exists on disk. + We retry up to _LOCK_RETRY_ATTEMPTS times with _LOCK_RETRY_DELAY_S + seconds between attempts so that brief overlaps (e.g. a concurrent + prefetch in another process) are automatically resolved. """ - try: - from mem0 import Memory - from mem0.configs.base import MemoryConfig + import time as _time - mem0_dict = _build_mem0_config(self._cfg) - mem_cfg = MemoryConfig(**{ - "vector_store": mem0_dict["vector_store"], - "llm": mem0_dict["llm"], - "embedder": mem0_dict["embedder"], - "history_db_path": mem0_dict["history_db_path"], - "version": mem0_dict["version"], - }) - return Memory(config=mem_cfg) - except Exception as exc: - logger.error("mem0_oss: failed to initialize Memory: %s", exc) - raise + last_exc: Optional[Exception] = None + for attempt in range(_LOCK_RETRY_ATTEMPTS): + try: + from mem0 import Memory + from mem0.configs.base import MemoryConfig + + mem0_dict = _build_mem0_config(self._cfg) + mem_cfg = MemoryConfig(**{ + "vector_store": mem0_dict["vector_store"], + "llm": mem0_dict["llm"], + "embedder": mem0_dict["embedder"], + "history_db_path": mem0_dict["history_db_path"], + "version": mem0_dict["version"], + }) + return Memory(config=mem_cfg) + except Exception as exc: + last_exc = exc + if _QDRANT_LOCK_ERROR in str(exc): + if attempt < _LOCK_RETRY_ATTEMPTS - 1: + import random as _random + jitter = _random.uniform(0, _LOCK_RETRY_DELAY_S * 0.5) + delay = _LOCK_RETRY_DELAY_S + jitter + logger.debug( + "mem0_oss: Qdrant lock busy (attempt %d/%d), retrying in %.2fs", + attempt + 1, _LOCK_RETRY_ATTEMPTS, delay, + ) + _time.sleep(delay) + continue + else: + # Non-lock error — fail fast, no retry + logger.error("mem0_oss: failed to initialize Memory: %s", exc) + raise + logger.error("mem0_oss: failed to initialize Memory after %d attempts: %s", _LOCK_RETRY_ATTEMPTS, last_exc) + raise last_exc # type: ignore[misc] # -- Circuit breaker helpers ------------------------------------------- @@ -667,6 +692,7 @@ class Mem0OSSMemoryProvider(MemoryProvider): top_k=self._top_k, filters={"user_id": self._user_id}, ) + del mem # release Qdrant lock ASAP — before any further processing memories = _extract_results(results) if memories: lines = "\n".join(f"- {m}" for m in memories) @@ -713,6 +739,7 @@ class Mem0OSSMemoryProvider(MemoryProvider): try: mem = self._get_memory() mem.add(messages=messages, user_id=self._user_id, infer=True) + del mem # release Qdrant lock ASAP self._record_success() except Exception as exc: if _QDRANT_LOCK_ERROR in str(exc): @@ -747,16 +774,18 @@ class Mem0OSSMemoryProvider(MemoryProvider): top_k=top_k, filters={"user_id": self._user_id}, ) + del mem # release Qdrant lock ASAP memories = _extract_results(results) self._record_success() if not memories: return json.dumps({"result": "No relevant memories found."}) return json.dumps({"result": "\n".join(f"- {m}" for m in memories)}) except Exception as exc: - self._record_failure() if _QDRANT_LOCK_ERROR in str(exc): + self._record_failure() # already handled by retry in _get_memory, but track it logger.warning("mem0_oss: Qdrant lock held by another process — search skipped") return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + self._record_failure() logger.error("mem0_oss: search error: %s", exc) return tool_error(f"mem0_oss_search failed: {exc}") @@ -772,13 +801,15 @@ class Mem0OSSMemoryProvider(MemoryProvider): user_id=self._user_id, infer=True, ) + del mem # release Qdrant lock ASAP self._record_success() return json.dumps({"result": "Memory stored successfully."}) except Exception as exc: - self._record_failure() if _QDRANT_LOCK_ERROR in str(exc): + self._record_failure() logger.warning("mem0_oss: Qdrant lock held by another process — add skipped") return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + self._record_failure() logger.error("mem0_oss: add error: %s", exc) return tool_error(f"mem0_oss_add failed: {exc}") From 9dca3f1c85f23d6f363ad27897106aad38c5e015 Mon Sep 17 00:00:00 2001 From: Hermes Local Date: Fri, 24 Apr 2026 10:49:13 +0000 Subject: [PATCH 9/9] fix(mem0_oss): fix misrouted last-attempt lock error in _get_memory The else branch for non-lock errors was incorrectly triggered on the final retry attempt of a Qdrant lock error, causing it to log ERROR and raise immediately rather than fall through to the lock-exhausted path. This tripped the circuit breaker during startup when WebUI and gateway processes competed for the lock. Fix: add a comment to skip the else on last lock-error attempt, and downgrade the exhausted-retries log from ERROR to WARNING since lock contention is expected and not a hard failure. --- plugins/memory/mem0_oss/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/memory/mem0_oss/__init__.py b/plugins/memory/mem0_oss/__init__.py index 6d998975d..27a476734 100644 --- a/plugins/memory/mem0_oss/__init__.py +++ b/plugins/memory/mem0_oss/__init__.py @@ -629,11 +629,15 @@ class Mem0OSSMemoryProvider(MemoryProvider): ) _time.sleep(delay) continue + # Last attempt also a lock error — fall through to raise below else: # Non-lock error — fail fast, no retry logger.error("mem0_oss: failed to initialize Memory: %s", exc) raise - logger.error("mem0_oss: failed to initialize Memory after %d attempts: %s", _LOCK_RETRY_ATTEMPTS, last_exc) + logger.warning( + "mem0_oss: Qdrant lock still held after %d attempts — giving up: %s", + _LOCK_RETRY_ATTEMPTS, last_exc, + ) raise last_exc # type: ignore[misc] # -- Circuit breaker helpers -------------------------------------------