From 3082fa0829e0df4ce682358481fb59275b31a46e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 5 May 2026 14:46:22 +0200 Subject: [PATCH] feat(hindsight): probe API for update_mode='append' support, dedupe across processes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the pattern already shipping in hindsight-integrations/openclaw: probe `/version` once per process, gate on Hindsight ≥ 0.5.0. When supported, retains use a stable session-scoped `document_id` (`session_id`) plus `update_mode='append'` so cross-process retains for the same session merge into one document instead of producing N-different-process-stamped duplicates. When unsupported (or probe fails), fall back to the existing per-process unique `f"{session_id}-{start_ts}"` document_id with no `update_mode` — the resume-overwrite fix (#6654) keeps working unchanged on legacy servers. Closes the dedup half of #20115. The proposed `document_id_strategy` config knob isn't needed: auto-detection via the same /version probe the OpenClaw plugin already uses gives the same outcome with no extra config burden, and the choice is purely a function of what the server can do. Plumbing -------- - Module-level helpers (`_meets_minimum_version`, `_fetch_hindsight_api_version`, `_check_api_supports_update_mode_append`) cache the result per api_url so every provider in the process gets one /version round-trip. - One-time WARN logged when the API is older than 0.5.0, telling the user to upgrade for cross-session deduplication. - New instance helper `_resolve_retain_target(fallback_doc_id)` returns `(document_id, update_mode)` based on cached capability. Wired into `sync_turn` and the `on_session_switch` flush path. - For local_embedded mode, the probe URL is taken from the running client (`client.url`) so we hit the actual daemon port rather than the configured default. - `update_mode` is set on the per-item dict; `aretain_batch` already threads `item['update_mode']` into the API call. Tests ----- - `TestUpdateModeAppendCapability` (5 cases): legacy fallback, modern stable+append, per-url cache, one-time warn, flush-on-switch resolves against the OLD session. - Existing `_make_hindsight_provider` factory in the manager-side test file extended to seed `_mode`/`_api_url`/`_api_key`/`_client` and stub `_resolve_retain_target` so the bypass-init pattern keeps working. E2E verified against installed `~/.hermes/hermes-agent`: - Legacy probe (unreachable host) → `legacy-session-` doc_id, no `update_mode`. - Modern probe (live local_embedded 0.5.6 daemon) → stable `modern-session` doc_id + `update_mode='append'`. - `test_hermes_embedded_smoke.py` passes (90s). --- plugins/memory/hindsight/__init__.py | 151 +++++++++++++++++- tests/agent/test_memory_session_switch.py | 8 + .../plugins/memory/test_hindsight_provider.py | 104 ++++++++++++ 3 files changed, 257 insertions(+), 6 deletions(-) diff --git a/plugins/memory/hindsight/__init__.py b/plugins/memory/hindsight/__init__.py index a280cbafd4..b7751a918e 100644 --- a/plugins/memory/hindsight/__init__.py +++ b/plugins/memory/hindsight/__init__.py @@ -52,6 +52,12 @@ _DEFAULT_LOCAL_URL = "http://localhost:8888" _MIN_CLIENT_VERSION = "0.4.22" _DEFAULT_TIMEOUT = 120 # seconds — cloud API can take 30-40s per request _DEFAULT_IDLE_TIMEOUT = 300 # seconds — Hindsight embedded daemon default +# Mirrors hindsight-integrations/openclaw — Hindsight 0.5.0 added +# `update_mode='append'` semantics on retain (vectorize-io/hindsight#932). +# Without it, reusing a stable session-scoped document_id silently +# overwrites prior turns server-side, so we keep the per-process +# unique document_id fallback for older APIs. +_MIN_VERSION_FOR_UPDATE_MODE_APPEND = "0.5.0" _VALID_BUDGETS = {"low", "mid", "high"} _PROVIDER_DEFAULT_MODELS = { "openai": "gpt-4o-mini", @@ -93,6 +99,95 @@ def _check_local_runtime() -> tuple[bool, str | None]: return False, str(exc) +# --------------------------------------------------------------------------- +# Hindsight API capability probe — mirrors hindsight-integrations/openclaw. +# --------------------------------------------------------------------------- + +# Cache of API_URL -> bool (whether that API supports update_mode='append'). +# Probed once per URL per process — every provider talking to the same API +# gets the same answer without re-hitting /version on each initialize(). +_append_capability_cache: Dict[str, bool] = {} +_append_capability_lock = threading.Lock() + + +def _meets_minimum_version(actual: str | None, required: str) -> bool: + """Return True if *actual* ≥ *required* (semver). False on missing/invalid.""" + if not actual: + return False + try: + from packaging.version import Version + return Version(actual) >= Version(required) + except Exception: + return False + + +def _fetch_hindsight_api_version(api_url: str, api_key: str | None = None, + timeout: float = 5.0) -> str | None: + """GET ``/version`` and return the version string (or None on failure). + + Hindsight's `/version` endpoint returns ``{"version": "0.5.6", ...}``. + Any failure (timeout, 404, malformed JSON, missing key) → None, which + the caller treats as "legacy API, no update_mode support". + """ + import urllib.error + import urllib.request + if not api_url: + return None + url = api_url.rstrip("/") + "/version" + req = urllib.request.Request(url) + if api_key: + req.add_header("Authorization", f"Bearer {api_key}") + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 + payload = resp.read().decode("utf-8", errors="replace") + data = json.loads(payload) + except Exception as exc: + logger.debug("Hindsight /version probe failed for %s: %s", url, exc) + return None + if not isinstance(data, dict): + return None + version = data.get("version") or data.get("api_version") + return str(version) if version else None + + +def _check_api_supports_update_mode_append(api_url: str, + api_key: str | None = None) -> bool: + """Cached capability check for ``update_mode='append'`` on *api_url*. + + Probes once per URL per process. Returns False on any probe failure — + that's the safe default: a per-process unique ``document_id`` and no + ``update_mode`` keeps the resume-overwrite fix (#6654) intact. + """ + if not api_url: + return False + with _append_capability_lock: + if api_url in _append_capability_cache: + return _append_capability_cache[api_url] + version = _fetch_hindsight_api_version(api_url, api_key) + supported = _meets_minimum_version(version, _MIN_VERSION_FOR_UPDATE_MODE_APPEND) + with _append_capability_lock: + # Re-check after acquiring the lock in case a concurrent probe filled it. + cached = _append_capability_cache.get(api_url) + if cached is None: + _append_capability_cache[api_url] = supported + else: + supported = cached + if not supported: + logger.warning( + "Hindsight API at %s reports version %r, older than %s. " + "Falling back to per-process document_id — retains across " + "processes/sessions create separate documents instead of " + "appending to a session-scoped one. Upgrade Hindsight to " + "%s+ to enable update_mode='append' deduplication.", + api_url, version, _MIN_VERSION_FOR_UPDATE_MODE_APPEND, + _MIN_VERSION_FOR_UPDATE_MODE_APPEND, + ) + else: + logger.debug("Hindsight API %s version %s supports update_mode='append'", + api_url, version) + return supported + + # --------------------------------------------------------------------------- # Dedicated event loop for Hindsight async calls (one per process, reused). # Avoids creating ephemeral loops that leak aiohttp sessions. @@ -918,6 +1013,40 @@ class HindsightMemoryProvider(MemoryProvider): self._client = client return self._run_sync(operation(client)) + def _probe_url(self) -> str: + """Return the URL to probe /version on. + + For local_embedded the daemon is on a per-profile dynamic port, + so we prefer the running client's URL when available; otherwise + fall back to the configured api_url. + """ + if self._mode == "local_embedded" and self._client is not None: + url = getattr(self._client, "url", None) + if url: + return str(url) + return self._api_url or "" + + def _resolve_retain_target(self, fallback_document_id: str) -> tuple[str, str | None]: + """Pick (document_id, update_mode) based on live API capability. + + On Hindsight ≥ 0.5.0 the API supports ``update_mode='append'``, + which lets us reuse a stable session-scoped ``document_id`` across + process lifecycles without overwriting prior turns. On older APIs + we fall back to *fallback_document_id* (the per-process unique + ``f"{session_id}-{start_ts}"`` minted at initialize / switch time) + and don't pass ``update_mode`` at all — that's the only way the + resume-overwrite fix (#6654) keeps working on legacy servers. + + Probe is cached at module level per API URL, so this is one HTTP + round-trip per (process, api_url) pair regardless of how many + retains fire. + """ + if not self._session_id: + return fallback_document_id, None + if _check_api_supports_update_mode_append(self._probe_url(), self._api_key): + return self._session_id, "append" + return fallback_document_id, None + def initialize(self, session_id: str, **kwargs) -> None: self._session_id = str(session_id or "").strip() self._parent_session_id = str(kwargs.get("parent_session_id", "") or "").strip() @@ -1319,7 +1448,7 @@ class HindsightMemoryProvider(MemoryProvider): turn_index=self._turn_index, ) num_turns = len(self._session_turns) - document_id = self._document_id + document_id, update_mode = self._resolve_retain_target(self._document_id) bank_id = self._bank_id retain_async_flag = self._retain_async retain_context = self._retain_context @@ -1333,8 +1462,10 @@ class HindsightMemoryProvider(MemoryProvider): ) item.pop("bank_id", None) item.pop("retain_async", None) - logger.debug("Hindsight retain: bank=%s, doc=%s, async=%s, content_len=%d, num_turns=%d", - bank_id, document_id, retain_async_flag, len(content), num_turns) + if update_mode is not None: + item["update_mode"] = update_mode + logger.debug("Hindsight retain: bank=%s, doc=%s, mode=%s, async=%s, content_len=%d, num_turns=%d", + bank_id, document_id, update_mode, retain_async_flag, len(content), num_turns) self._run_hindsight_operation( lambda client: client.aretain_batch( bank_id=bank_id, @@ -1471,7 +1602,6 @@ class HindsightMemoryProvider(MemoryProvider): if self._session_turns: old_turns = list(self._session_turns) old_session_id = self._session_id - old_document_id = self._document_id old_parent_session_id = self._parent_session_id old_turn_index = self._turn_index old_metadata = self._build_metadata( @@ -1484,6 +1614,13 @@ class HindsightMemoryProvider(MemoryProvider): if old_parent_session_id: old_lineage_tags.append(f"parent:{old_parent_session_id}") old_content = "[" + ",".join(old_turns) + "]" + # Resolve doc_id + update_mode against the OLD session BEFORE + # we rotate _session_id, so the flush lands in the old + # session's document either way (legacy: per-process unique; + # ≥0.5.0: stable session-scoped + append). + old_document_id, old_update_mode = self._resolve_retain_target( + self._document_id + ) def _flush(): try: @@ -1495,9 +1632,11 @@ class HindsightMemoryProvider(MemoryProvider): ) item.pop("bank_id", None) item.pop("retain_async", None) + if old_update_mode is not None: + item["update_mode"] = old_update_mode logger.debug( - "Hindsight flush-on-switch: bank=%s, doc=%s, num_turns=%d", - self._bank_id, old_document_id, len(old_turns), + "Hindsight flush-on-switch: bank=%s, doc=%s, mode=%s, num_turns=%d", + self._bank_id, old_document_id, old_update_mode, len(old_turns), ) self._run_hindsight_operation( lambda client: client.aretain_batch( diff --git a/tests/agent/test_memory_session_switch.py b/tests/agent/test_memory_session_switch.py index 610c09b29f..61cd6edbaf 100644 --- a/tests/agent/test_memory_session_switch.py +++ b/tests/agent/test_memory_session_switch.py @@ -248,6 +248,14 @@ def _make_hindsight_provider(): provider._atexit_registered = True provider._ensure_writer = lambda: None provider._register_atexit = lambda: None + # Mode + API state used by _resolve_retain_target; stub the resolver + # so tests don't actually probe the API. Real probe behavior is + # exercised by tests in tests/plugins/memory/test_hindsight_provider.py. + provider._mode = "cloud" + provider._api_url = "" + provider._api_key = "" + provider._client = None + provider._resolve_retain_target = lambda fb: (fb, None) # Stub the network-touching helper so any enqueued flush closure is # a no-op if ever drained in a unit test. provider._run_hindsight_operation = lambda _op: None diff --git a/tests/plugins/memory/test_hindsight_provider.py b/tests/plugins/memory/test_hindsight_provider.py index 334e6ab5ea..fcda46e56b 100644 --- a/tests/plugins/memory/test_hindsight_provider.py +++ b/tests/plugins/memory/test_hindsight_provider.py @@ -1072,6 +1072,110 @@ class TestSessionSwitchBufferFlush: assert call_order[1] == "3" +# --------------------------------------------------------------------------- +# update_mode='append' capability probe + retain dispatch +# --------------------------------------------------------------------------- + + +class TestUpdateModeAppendCapability: + def _clear_capability_cache(self): + from plugins.memory.hindsight import _append_capability_cache, _append_capability_lock + with _append_capability_lock: + _append_capability_cache.clear() + + def test_legacy_api_falls_back_to_per_process_doc_id(self, provider, monkeypatch): + """API returns no /version (or pre-0.5.0) — sync_turn must use the + per-process unique doc_id and NOT pass update_mode.""" + self._clear_capability_cache() + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", + lambda *a, **kw: None, + ) + old_doc = provider._document_id + provider.sync_turn("hello", "hi") + provider._retain_queue.join() + + kw = provider._client.aretain_batch.call_args.kwargs + assert kw["document_id"] == old_doc + assert kw["document_id"].startswith("test-session-") + item = kw["items"][0] + assert "update_mode" not in item + + def test_modern_api_uses_stable_doc_id_with_append(self, provider, monkeypatch): + """API on >=0.5.0 — retain uses stable session_id and sets update_mode='append'.""" + self._clear_capability_cache() + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", + lambda *a, **kw: "0.5.6", + ) + provider.sync_turn("hello", "hi") + provider._retain_queue.join() + + kw = provider._client.aretain_batch.call_args.kwargs + # Stable: just the session id, no per-process timestamp suffix. + assert kw["document_id"] == "test-session" + item = kw["items"][0] + assert item["update_mode"] == "append" + + def test_capability_cached_per_url(self, provider, monkeypatch): + """The /version probe must run at most once per (process, api_url).""" + self._clear_capability_cache() + calls = {"n": 0} + + def _spy(*a, **kw): + calls["n"] += 1 + return "0.5.6" + + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", _spy + ) + provider.sync_turn("a", "b") + provider._retain_queue.join() + provider.sync_turn("c", "d") + provider._retain_queue.join() + assert calls["n"] == 1 + + def test_legacy_warning_emitted_once(self, provider, monkeypatch, caplog): + """One-time WARN nudges users to upgrade Hindsight.""" + import logging + self._clear_capability_cache() + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", + lambda *a, **kw: "0.4.22", + ) + with caplog.at_level(logging.WARNING, logger="plugins.memory.hindsight"): + provider.sync_turn("a", "b") + provider._retain_queue.join() + provider.sync_turn("c", "d") + provider._retain_queue.join() + warns = [r for r in caplog.records + if r.levelno == logging.WARNING + and "older than 0.5.0" in r.getMessage()] + # Cache hit on the second call → no second warn. + assert len(warns) == 1 + + def test_session_switch_flush_picks_capability_against_old_session( + self, provider_with_config, monkeypatch + ): + """When the API supports append, the flush on /reset must land + in the OLD session's stable document, not a per-process id.""" + self._clear_capability_cache() + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", + lambda *a, **kw: "0.5.6", + ) + p = provider_with_config(retain_every_n_turns=3, retain_async=False) + p.sync_turn("turn1-user", "turn1-asst") + p.sync_turn("turn2-user", "turn2-asst") + p.on_session_switch("new-sid", parent_session_id="test-session", reset=True) + p._retain_queue.join() + + kw = p._client.aretain_batch.call_args.kwargs + # Flush goes to the OLD session's stable doc, not new-sid's. + assert kw["document_id"] == "test-session" + assert kw["items"][0]["update_mode"] == "append" + + # --------------------------------------------------------------------------- # System prompt tests # ---------------------------------------------------------------------------