mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(mem0_oss): retry Qdrant lock contention in _get_memory with backoff
Add _LOCK_RETRY_ATTEMPTS (10) and _LOCK_RETRY_DELAY_S (0.8s) constants. Retry loop in _get_memory() retries on Qdrant portalocker errors with jitter, non-lock errors still fail fast. Add explicit del mem after each operation to release the lock ASAP. Fix record_failure ordering in search and add so lock errors are tracked correctly.
This commit is contained in:
parent
c6ee9ffcb1
commit
994c6f9e98
1 changed files with 56 additions and 25 deletions
|
|
@ -75,6 +75,13 @@ _BREAKER_COOLDOWN_SECS = 120
|
|||
# Qdrant embedded lock error substring — used to detect contention gracefully.
|
||||
_QDRANT_LOCK_ERROR = "already accessed by another instance"
|
||||
|
||||
# Retry parameters for Qdrant lock contention in _get_memory().
|
||||
# Two processes (WebUI + gateway) may briefly overlap; retry resolves it.
|
||||
# Prefetch + sync operations hold the lock during an LLM call (~1-3s),
|
||||
# so we retry for up to 15s total with jitter to avoid thundering herd.
|
||||
_LOCK_RETRY_ATTEMPTS = 10 # total attempts
|
||||
_LOCK_RETRY_DELAY_S = 0.8 # base seconds between retries (with jitter, up to ~0.4s extra)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config helpers
|
||||
|
|
@ -584,32 +591,50 @@ class Mem0OSSMemoryProvider(MemoryProvider):
|
|||
"""Create a fresh mem0 Memory instance for each call.
|
||||
|
||||
We intentionally do NOT cache the instance. The embedded Qdrant store
|
||||
uses a file lock that is held for the lifetime of the client object.
|
||||
When both the WebUI and the gateway run in the same host they would
|
||||
otherwise compete for the lock — whichever process cached it first
|
||||
would block all calls in the other process.
|
||||
uses a portalocker (fcntl) exclusive lock that is held for the lifetime
|
||||
of the client object. When both the WebUI and the gateway run on the
|
||||
same host they compete for this lock.
|
||||
|
||||
By creating a new instance per call and letting it go out of scope
|
||||
afterwards, the lock is acquired and released on each operation so
|
||||
both processes can coexist. The overhead is acceptable: Qdrant
|
||||
initialisation is fast once the collection already exists on disk.
|
||||
We retry up to _LOCK_RETRY_ATTEMPTS times with _LOCK_RETRY_DELAY_S
|
||||
seconds between attempts so that brief overlaps (e.g. a concurrent
|
||||
prefetch in another process) are automatically resolved.
|
||||
"""
|
||||
try:
|
||||
from mem0 import Memory
|
||||
from mem0.configs.base import MemoryConfig
|
||||
import time as _time
|
||||
|
||||
mem0_dict = _build_mem0_config(self._cfg)
|
||||
mem_cfg = MemoryConfig(**{
|
||||
"vector_store": mem0_dict["vector_store"],
|
||||
"llm": mem0_dict["llm"],
|
||||
"embedder": mem0_dict["embedder"],
|
||||
"history_db_path": mem0_dict["history_db_path"],
|
||||
"version": mem0_dict["version"],
|
||||
})
|
||||
return Memory(config=mem_cfg)
|
||||
except Exception as exc:
|
||||
logger.error("mem0_oss: failed to initialize Memory: %s", exc)
|
||||
raise
|
||||
last_exc: Optional[Exception] = None
|
||||
for attempt in range(_LOCK_RETRY_ATTEMPTS):
|
||||
try:
|
||||
from mem0 import Memory
|
||||
from mem0.configs.base import MemoryConfig
|
||||
|
||||
mem0_dict = _build_mem0_config(self._cfg)
|
||||
mem_cfg = MemoryConfig(**{
|
||||
"vector_store": mem0_dict["vector_store"],
|
||||
"llm": mem0_dict["llm"],
|
||||
"embedder": mem0_dict["embedder"],
|
||||
"history_db_path": mem0_dict["history_db_path"],
|
||||
"version": mem0_dict["version"],
|
||||
})
|
||||
return Memory(config=mem_cfg)
|
||||
except Exception as exc:
|
||||
last_exc = exc
|
||||
if _QDRANT_LOCK_ERROR in str(exc):
|
||||
if attempt < _LOCK_RETRY_ATTEMPTS - 1:
|
||||
import random as _random
|
||||
jitter = _random.uniform(0, _LOCK_RETRY_DELAY_S * 0.5)
|
||||
delay = _LOCK_RETRY_DELAY_S + jitter
|
||||
logger.debug(
|
||||
"mem0_oss: Qdrant lock busy (attempt %d/%d), retrying in %.2fs",
|
||||
attempt + 1, _LOCK_RETRY_ATTEMPTS, delay,
|
||||
)
|
||||
_time.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
# Non-lock error — fail fast, no retry
|
||||
logger.error("mem0_oss: failed to initialize Memory: %s", exc)
|
||||
raise
|
||||
logger.error("mem0_oss: failed to initialize Memory after %d attempts: %s", _LOCK_RETRY_ATTEMPTS, last_exc)
|
||||
raise last_exc # type: ignore[misc]
|
||||
|
||||
# -- Circuit breaker helpers -------------------------------------------
|
||||
|
||||
|
|
@ -667,6 +692,7 @@ class Mem0OSSMemoryProvider(MemoryProvider):
|
|||
top_k=self._top_k,
|
||||
filters={"user_id": self._user_id},
|
||||
)
|
||||
del mem # release Qdrant lock ASAP — before any further processing
|
||||
memories = _extract_results(results)
|
||||
if memories:
|
||||
lines = "\n".join(f"- {m}" for m in memories)
|
||||
|
|
@ -713,6 +739,7 @@ class Mem0OSSMemoryProvider(MemoryProvider):
|
|||
try:
|
||||
mem = self._get_memory()
|
||||
mem.add(messages=messages, user_id=self._user_id, infer=True)
|
||||
del mem # release Qdrant lock ASAP
|
||||
self._record_success()
|
||||
except Exception as exc:
|
||||
if _QDRANT_LOCK_ERROR in str(exc):
|
||||
|
|
@ -747,16 +774,18 @@ class Mem0OSSMemoryProvider(MemoryProvider):
|
|||
top_k=top_k,
|
||||
filters={"user_id": self._user_id},
|
||||
)
|
||||
del mem # release Qdrant lock ASAP
|
||||
memories = _extract_results(results)
|
||||
self._record_success()
|
||||
if not memories:
|
||||
return json.dumps({"result": "No relevant memories found."})
|
||||
return json.dumps({"result": "\n".join(f"- {m}" for m in memories)})
|
||||
except Exception as exc:
|
||||
self._record_failure()
|
||||
if _QDRANT_LOCK_ERROR in str(exc):
|
||||
self._record_failure() # already handled by retry in _get_memory, but track it
|
||||
logger.warning("mem0_oss: Qdrant lock held by another process — search skipped")
|
||||
return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."})
|
||||
self._record_failure()
|
||||
logger.error("mem0_oss: search error: %s", exc)
|
||||
return tool_error(f"mem0_oss_search failed: {exc}")
|
||||
|
||||
|
|
@ -772,13 +801,15 @@ class Mem0OSSMemoryProvider(MemoryProvider):
|
|||
user_id=self._user_id,
|
||||
infer=True,
|
||||
)
|
||||
del mem # release Qdrant lock ASAP
|
||||
self._record_success()
|
||||
return json.dumps({"result": "Memory stored successfully."})
|
||||
except Exception as exc:
|
||||
self._record_failure()
|
||||
if _QDRANT_LOCK_ERROR in str(exc):
|
||||
self._record_failure()
|
||||
logger.warning("mem0_oss: Qdrant lock held by another process — add skipped")
|
||||
return json.dumps({"result": "Memory temporarily unavailable (storage locked by another process)."})
|
||||
self._record_failure()
|
||||
logger.error("mem0_oss: add error: %s", exc)
|
||||
return tool_error(f"mem0_oss_add failed: {exc}")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue