diff --git a/plugins/memory/mem0_oss/__init__.py b/plugins/memory/mem0_oss/__init__.py index 3025245b9..6d998975d 100644 --- a/plugins/memory/mem0_oss/__init__.py +++ b/plugins/memory/mem0_oss/__init__.py @@ -75,6 +75,13 @@ _BREAKER_COOLDOWN_SECS = 120 # Qdrant embedded lock error substring — used to detect contention gracefully. _QDRANT_LOCK_ERROR = "already accessed by another instance" +# Retry parameters for Qdrant lock contention in _get_memory(). +# Two processes (WebUI + gateway) may briefly overlap; retry resolves it. +# Prefetch + sync operations hold the lock during an LLM call (~1-3s), +# so we retry for up to 15s total with jitter to avoid thundering herd. +_LOCK_RETRY_ATTEMPTS = 10 # total attempts +_LOCK_RETRY_DELAY_S = 0.8 # base seconds between retries (with jitter, up to ~0.4s extra) + # --------------------------------------------------------------------------- # Config helpers @@ -584,32 +591,50 @@ class Mem0OSSMemoryProvider(MemoryProvider): """Create a fresh mem0 Memory instance for each call. We intentionally do NOT cache the instance. The embedded Qdrant store - uses a file lock that is held for the lifetime of the client object. - When both the WebUI and the gateway run in the same host they would - otherwise compete for the lock — whichever process cached it first - would block all calls in the other process. + uses a portalocker (fcntl) exclusive lock that is held for the lifetime + of the client object. When both the WebUI and the gateway run on the + same host they compete for this lock. - By creating a new instance per call and letting it go out of scope - afterwards, the lock is acquired and released on each operation so - both processes can coexist. The overhead is acceptable: Qdrant - initialisation is fast once the collection already exists on disk. + We retry up to _LOCK_RETRY_ATTEMPTS times with _LOCK_RETRY_DELAY_S + seconds between attempts so that brief overlaps (e.g. a concurrent + prefetch in another process) are automatically resolved. """ - try: - from mem0 import Memory - from mem0.configs.base import MemoryConfig + import time as _time - mem0_dict = _build_mem0_config(self._cfg) - mem_cfg = MemoryConfig(**{ - "vector_store": mem0_dict["vector_store"], - "llm": mem0_dict["llm"], - "embedder": mem0_dict["embedder"], - "history_db_path": mem0_dict["history_db_path"], - "version": mem0_dict["version"], - }) - return Memory(config=mem_cfg) - except Exception as exc: - logger.error("mem0_oss: failed to initialize Memory: %s", exc) - raise + last_exc: Optional[Exception] = None + for attempt in range(_LOCK_RETRY_ATTEMPTS): + try: + from mem0 import Memory + from mem0.configs.base import MemoryConfig + + mem0_dict = _build_mem0_config(self._cfg) + mem_cfg = MemoryConfig(**{ + "vector_store": mem0_dict["vector_store"], + "llm": mem0_dict["llm"], + "embedder": mem0_dict["embedder"], + "history_db_path": mem0_dict["history_db_path"], + "version": mem0_dict["version"], + }) + return Memory(config=mem_cfg) + except Exception as exc: + last_exc = exc + if _QDRANT_LOCK_ERROR in str(exc): + if attempt < _LOCK_RETRY_ATTEMPTS - 1: + import random as _random + jitter = _random.uniform(0, _LOCK_RETRY_DELAY_S * 0.5) + delay = _LOCK_RETRY_DELAY_S + jitter + logger.debug( + "mem0_oss: Qdrant lock busy (attempt %d/%d), retrying in %.2fs", + attempt + 1, _LOCK_RETRY_ATTEMPTS, delay, + ) + _time.sleep(delay) + continue + else: + # Non-lock error — fail fast, no retry + logger.error("mem0_oss: failed to initialize Memory: %s", exc) + raise + logger.error("mem0_oss: failed to initialize Memory after %d attempts: %s", _LOCK_RETRY_ATTEMPTS, last_exc) + raise last_exc # type: ignore[misc] # -- Circuit breaker helpers ------------------------------------------- @@ -667,6 +692,7 @@ class Mem0OSSMemoryProvider(MemoryProvider): top_k=self._top_k, filters={"user_id": self._user_id}, ) + del mem # release Qdrant lock ASAP — before any further processing memories = _extract_results(results) if memories: lines = "\n".join(f"- {m}" for m in memories) @@ -713,6 +739,7 @@ class Mem0OSSMemoryProvider(MemoryProvider): try: mem = self._get_memory() mem.add(messages=messages, user_id=self._user_id, infer=True) + del mem # release Qdrant lock ASAP self._record_success() except Exception as exc: if _QDRANT_LOCK_ERROR in str(exc): @@ -747,16 +774,18 @@ class Mem0OSSMemoryProvider(MemoryProvider): top_k=top_k, filters={"user_id": self._user_id}, ) + del mem # release Qdrant lock ASAP memories = _extract_results(results) self._record_success() if not memories: return json.dumps({"result": "No relevant memories found."}) return json.dumps({"result": "\n".join(f"- {m}" for m in memories)}) except Exception as exc: - self._record_failure() if _QDRANT_LOCK_ERROR in str(exc): + self._record_failure() # already handled by retry in _get_memory, but track it logger.warning("mem0_oss: Qdrant lock held by another process — search skipped") return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + self._record_failure() logger.error("mem0_oss: search error: %s", exc) return tool_error(f"mem0_oss_search failed: {exc}") @@ -772,13 +801,15 @@ class Mem0OSSMemoryProvider(MemoryProvider): user_id=self._user_id, infer=True, ) + del mem # release Qdrant lock ASAP self._record_success() return json.dumps({"result": "Memory stored successfully."}) except Exception as exc: - self._record_failure() if _QDRANT_LOCK_ERROR in str(exc): + self._record_failure() logger.warning("mem0_oss: Qdrant lock held by another process — add skipped") return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."}) + self._record_failure() logger.error("mem0_oss: add error: %s", exc) return tool_error(f"mem0_oss_add failed: {exc}")