mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-24 10:52:21 +00:00
* fix: update to version 3 endpoints and adding update and delete tool
* chore: removing the test md file
* fix: prevent circuit breaker on client errors in Mem0 provider
* chore: add telemetry for platform version
* feat: add OSS mode support to Mem0 memory provider
* chore: bump mem0ai dependency to >=2.0.1 in memory plugin
* refactor: enhance dependency checks and embedder config in mem0 backend
* refactor: adjust fact storage message for OSS mode
* refactor: expand user paths, add collection recreation on dimension change for Qdrant
* fix(mem0): make MEM0_USER_ID override gateway-native ids and tag writes with channel
When MEM0_USER_ID was configured (env or mem0.json), the gateway-native id
from kwargs (Telegram numeric id, Discord snowflake, ...) still won, so the
same human ended up under different user_ids per channel and memories never
merged across CLI / Telegram / Slack / Discord. Mirrors openclaw's cfg.userId
pattern: configured override wins, gateway-native id is the fallback.
The legacy "hermes-user" placeholder default written by the setup wizard is
treated as unset to avoid silently bucketing every gateway user together.
Also tag every write with metadata.channel (cli/telegram/discord/...) so the
dashboard can offer per-channel filtered views without coupling identity to
the channel; document the read/write filter asymmetry as intentional
(reads scope to user_id only for cross-agent recall).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor: improve Mem0 memory provider backend, pagination, config, and error handling
* refactor: update mem0 telemetry code, docs, and bump version
* fix(mem0): make get_config_schema() return unified schema with mode-aware required flag
Schema always includes api_key field so picker shows "API key / local" for
both modes. In OSS mode api_key.required=False so status won't mislead.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* refactor: improve mem0 telemetry, add env var key and OSS mode detection
* chore: bump mem0ai lower bound to 2.0.4 (latest SDK release)
* refactor: set telemetry sample rate to 1.0 and update docs for opt‑out
* fix(mem0): resolve 15 correctness, thread-safety, and resource bugs
Thread safety:
- Protect circuit breaker counters with _breaker_lock (race between
prefetch/sync daemon threads and main thread)
- Wrap sync_turn thread creation in _sync_lock; skip if previous sync
is still alive after 5 s join to prevent duplicate memory ingestion
- Guard _schedule_flush timer creation under _queue_lock (TOCTOU race)
- Capture local `backend` reference in prefetch/sync closures so
shutdown() nulling self._backend cannot crash in-flight threads
Correctness:
- Fix bool("false")==True for rerank param; parse string values explicitly
- Guard page/top_k with max(1,...) and move int() inside try blocks
- Fix fact_count=0 always in OSS mode (Memory.add returns list, not dict)
- Fix prefetch() not clearing result when thread still alive after timeout
- Fix atexit.register accumulating on repeated initialize() calls
Backend / setup:
- Handle Qdrant named-vector collections in _recreate_collection_if_dims_changed
(vectors is a dict; .size access raised AttributeError, swallowed silently)
- Wrap QdrantClient and psycopg2 conn/cursor in try/finally to prevent leaks
- Resolve ollama_bin at top of _ensure_ollama; use it for ollama pull
- Fix embedder key lookup when LLM provider has no env_var (e.g. ollama)
Also: remove _telemetry_enabled cache (env var check is cheap), bump
required mem0ai to >=2.0.7, minor README wording fix.
* fix(mem0): fix brittle qdrant path test + add telemetry sample-rate docs
- Replace generator-throw lambda with a proper def in
test_qdrant_path_not_writable; use tmp_path instead of a hardcoded
/nonexistent path so the test is root-safe
- Add MEM0_TELEMETRY_SAMPLE_RATE to memory-providers.md (was only
in the plugin README, not the user-guide docs)
* revert: remove MEM0_TELEMETRY_SAMPLE_RATE from user-guide docs
* refactor: remove telemetry from mem0 plugin and update documentation
* fix(mem0): set stdin=DEVNULL on setup subprocess calls
The TUI stdin guard (scripts/check_subprocess_stdin.py) requires every
subprocess call in plugin code to set stdin= so it can't inherit the
gateway's JSON-RPC stdin fd. Muzzle the docker/ollama calls in the OSS
setup wizard with stdin=subprocess.DEVNULL (none need interactive input).
Also covers the docker-inspect call the linter's regex misses.
---------
Co-authored-by: chaithanyak42 <chaithanya.kumar42a@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
243 lines
8.8 KiB
Python
243 lines
8.8 KiB
Python
"""Backend abstraction for Mem0 Platform and OSS modes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any
|
|
|
|
|
|
class Mem0Backend(ABC):
|
|
"""Unified interface over Platform (MemoryClient) and OSS (Memory) backends."""
|
|
|
|
@abstractmethod
|
|
def search(self, query: str, *, filters: dict, top_k: int = 10, rerank: bool = True) -> list[dict]:
|
|
...
|
|
|
|
@abstractmethod
|
|
def get_all(self, *, filters: dict, page: int = 1, page_size: int = 100) -> dict:
|
|
...
|
|
|
|
@abstractmethod
|
|
def add(
|
|
self,
|
|
messages: list,
|
|
*,
|
|
user_id: str,
|
|
agent_id: str,
|
|
infer: bool = False,
|
|
metadata: dict | None = None,
|
|
) -> dict:
|
|
...
|
|
|
|
@abstractmethod
|
|
def update(self, memory_id: str, text: str) -> dict:
|
|
...
|
|
|
|
@abstractmethod
|
|
def delete(self, memory_id: str) -> dict:
|
|
...
|
|
|
|
def close(self) -> None:
|
|
pass
|
|
|
|
|
|
def _unwrap_results(response: Any) -> list:
|
|
"""Normalize API response — extract results list from dict or pass through."""
|
|
if isinstance(response, dict):
|
|
return response.get("results", [])
|
|
if isinstance(response, list):
|
|
return response
|
|
return []
|
|
|
|
|
|
class PlatformBackend(Mem0Backend):
|
|
"""Wraps mem0.MemoryClient for Mem0 Platform (cloud API)."""
|
|
|
|
def __init__(self, api_key: str):
|
|
from mem0 import MemoryClient
|
|
self._client = MemoryClient(api_key=api_key)
|
|
|
|
def search(self, query: str, *, filters: dict, top_k: int = 10, rerank: bool = True) -> list[dict]:
|
|
response = self._client.search(query, filters=filters, top_k=top_k, rerank=rerank)
|
|
return _unwrap_results(response)
|
|
|
|
def get_all(self, *, filters: dict, page: int = 1, page_size: int = 100) -> dict:
|
|
response = self._client.get_all(filters=filters, page=page, page_size=page_size)
|
|
results = response.get("results", []) if isinstance(response, dict) else response
|
|
count = response.get("count", len(results)) if isinstance(response, dict) else len(results)
|
|
return {"results": results, "count": count}
|
|
|
|
def add(
|
|
self,
|
|
messages: list,
|
|
*,
|
|
user_id: str,
|
|
agent_id: str,
|
|
infer: bool = False,
|
|
metadata: dict | None = None,
|
|
) -> dict:
|
|
kwargs: dict[str, Any] = {"user_id": user_id, "agent_id": agent_id, "infer": infer}
|
|
if metadata:
|
|
kwargs["metadata"] = metadata
|
|
return self._client.add(messages, **kwargs)
|
|
|
|
def update(self, memory_id: str, text: str) -> dict:
|
|
self._client.update(memory_id=memory_id, text=text)
|
|
return {"result": "Memory updated.", "memory_id": memory_id}
|
|
|
|
def delete(self, memory_id: str) -> dict:
|
|
self._client.delete(memory_id=memory_id)
|
|
return {"result": "Memory deleted.", "memory_id": memory_id}
|
|
|
|
|
|
class OSSBackend(Mem0Backend):
|
|
"""Wraps mem0.Memory for self-hosted (OSS) mode."""
|
|
|
|
def __init__(self, oss_config: dict):
|
|
import os
|
|
from mem0 import Memory
|
|
|
|
vector_store = dict(oss_config["vector_store"])
|
|
vs_config = dict(vector_store.get("config", {}))
|
|
|
|
if "path" in vs_config:
|
|
vs_config["path"] = os.path.expanduser(vs_config["path"])
|
|
|
|
embedder_config = oss_config.get("embedder", {}).get("config", {})
|
|
dims = embedder_config.get("embedding_dims")
|
|
if not dims:
|
|
from ._oss_providers import KNOWN_DIMS
|
|
model = embedder_config.get("model", "")
|
|
dims = KNOWN_DIMS.get(model)
|
|
if dims:
|
|
vs_config["embedding_model_dims"] = dims
|
|
self._recreate_collection_if_dims_changed(
|
|
vector_store.get("provider", "qdrant"), vs_config, dims,
|
|
)
|
|
|
|
vector_store["config"] = vs_config
|
|
|
|
config = {
|
|
"vector_store": vector_store,
|
|
"llm": oss_config["llm"],
|
|
"embedder": oss_config["embedder"],
|
|
"version": "v1.1",
|
|
}
|
|
self._memory = Memory.from_config(config)
|
|
|
|
@staticmethod
|
|
def _recreate_collection_if_dims_changed(provider: str, vs_config: dict, expected_dims: int) -> None:
|
|
"""Delete stale vector collection when embedding dimensions change."""
|
|
collection_name = vs_config.get("collection_name", "mem0")
|
|
if provider == "qdrant":
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
path = vs_config.get("path")
|
|
url = vs_config.get("url")
|
|
if path:
|
|
client = QdrantClient(path=path)
|
|
elif url:
|
|
client = QdrantClient(url=url, api_key=vs_config.get("api_key"))
|
|
else:
|
|
return
|
|
try:
|
|
if not client.collection_exists(collection_name):
|
|
return
|
|
info = client.get_collection(collection_name)
|
|
vectors = info.config.params.vectors
|
|
# Named-vector collections expose a dict; unnamed expose an object with .size.
|
|
if isinstance(vectors, dict):
|
|
first = next(iter(vectors.values()), None)
|
|
current_dims = first.size if first else None
|
|
else:
|
|
current_dims = getattr(vectors, "size", None)
|
|
if current_dims is not None and current_dims != expected_dims:
|
|
client.delete_collection(collection_name)
|
|
finally:
|
|
client.close()
|
|
except Exception:
|
|
pass
|
|
elif provider == "pgvector":
|
|
try:
|
|
import psycopg2
|
|
from psycopg2 import sql as pgsql
|
|
conn_params = {}
|
|
for k in ("host", "port", "user", "password", "dbname"):
|
|
if vs_config.get(k):
|
|
conn_params[k] = vs_config[k]
|
|
if vs_config.get("sslmode"):
|
|
conn_params["sslmode"] = vs_config["sslmode"]
|
|
conn = psycopg2.connect(**conn_params)
|
|
conn.autocommit = True
|
|
try:
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
"SELECT atttypmod FROM pg_attribute "
|
|
"WHERE attrelid = %s::regclass AND attname = 'vector'",
|
|
(collection_name,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row and row[0] > 0 and row[0] != expected_dims:
|
|
cur.execute(pgsql.SQL("DROP TABLE IF EXISTS {}").format(
|
|
pgsql.Identifier(collection_name)
|
|
))
|
|
finally:
|
|
cur.close()
|
|
finally:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def search(self, query: str, *, filters: dict, top_k: int = 10, rerank: bool = True) -> list[dict]:
|
|
response = self._memory.search(query, filters=filters, top_k=top_k)
|
|
return _unwrap_results(response)
|
|
|
|
def get_all(self, *, filters: dict, page: int = 1, page_size: int = 100) -> dict:
|
|
response = self._memory.get_all(filters=filters)
|
|
all_results = _unwrap_results(response)
|
|
total = len(all_results)
|
|
start = (page - 1) * page_size
|
|
results = all_results[start : start + page_size]
|
|
return {"results": results, "count": total}
|
|
|
|
def add(
|
|
self,
|
|
messages: list,
|
|
*,
|
|
user_id: str,
|
|
agent_id: str,
|
|
infer: bool = False,
|
|
metadata: dict | None = None,
|
|
) -> dict:
|
|
kwargs: dict[str, Any] = {"user_id": user_id, "agent_id": agent_id, "infer": infer}
|
|
if metadata:
|
|
kwargs["metadata"] = metadata
|
|
return self._memory.add(messages, **kwargs)
|
|
|
|
def update(self, memory_id: str, text: str) -> dict:
|
|
self._memory.update(memory_id, data=text)
|
|
return {"result": "Memory updated.", "memory_id": memory_id}
|
|
|
|
def delete(self, memory_id: str) -> dict:
|
|
self._memory.delete(memory_id)
|
|
return {"result": "Memory deleted.", "memory_id": memory_id}
|
|
|
|
def close(self):
|
|
try:
|
|
telemetry = getattr(self._memory, "telemetry", None)
|
|
if telemetry and hasattr(telemetry, "posthog"):
|
|
try:
|
|
telemetry.posthog.shutdown()
|
|
except Exception:
|
|
pass
|
|
if hasattr(self._memory, "close"):
|
|
self._memory.close()
|
|
vs = getattr(self._memory, "vector_store", None)
|
|
if vs and hasattr(vs, "close"):
|
|
vs.close()
|
|
client = getattr(vs, "client", None)
|
|
if client and hasattr(client, "close"):
|
|
client.close()
|
|
except Exception:
|
|
pass
|