feat(hindsight): probe API for update_mode='append' support, dedupe across processes

Mirrors the pattern already shipping in hindsight-integrations/openclaw:
probe `<api_url>/version` once per process, gate on Hindsight ≥ 0.5.0.
When supported, retains use a stable session-scoped `document_id`
(`session_id`) plus `update_mode='append'` so cross-process retains for
the same session merge into one document instead of producing
N-different-process-stamped duplicates. When unsupported (or probe
fails), fall back to the existing per-process unique
`f"{session_id}-{start_ts}"` document_id with no `update_mode` — the
resume-overwrite fix (#6654) keeps working unchanged on legacy servers.

Closes the dedup half of #20115. The proposed `document_id_strategy`
config knob isn't needed: auto-detection via the same /version probe
the OpenClaw plugin already uses gives the same outcome with no extra
config burden, and the choice is purely a function of what the server
can do.

Plumbing
--------
- Module-level helpers (`_meets_minimum_version`, `_fetch_hindsight_api_version`,
  `_check_api_supports_update_mode_append`) cache the result per api_url
  so every provider in the process gets one /version round-trip.
- One-time WARN logged when the API is older than 0.5.0, telling the
  user to upgrade for cross-session deduplication.
- New instance helper `_resolve_retain_target(fallback_doc_id)` returns
  `(document_id, update_mode)` based on cached capability. Wired into
  `sync_turn` and the `on_session_switch` flush path.
- For local_embedded mode, the probe URL is taken from the running
  client (`client.url`) so we hit the actual daemon port rather than
  the configured default.
- `update_mode` is set on the per-item dict; `aretain_batch` already
  threads `item['update_mode']` into the API call.

Tests
-----
- `TestUpdateModeAppendCapability` (5 cases): legacy fallback, modern
  stable+append, per-url cache, one-time warn, flush-on-switch resolves
  against the OLD session.
- Existing `_make_hindsight_provider` factory in the manager-side test
  file extended to seed `_mode`/`_api_url`/`_api_key`/`_client` and stub
  `_resolve_retain_target` so the bypass-init pattern keeps working.

E2E verified against installed `~/.hermes/hermes-agent`:
- Legacy probe (unreachable host) → `legacy-session-<ts>` doc_id,
  no `update_mode`.
- Modern probe (live local_embedded 0.5.6 daemon) → stable
  `modern-session` doc_id + `update_mode='append'`.
- `test_hermes_embedded_smoke.py` passes (90s).
This commit is contained in:
Nicolò Boschi 2026-05-05 14:46:22 +02:00 committed by Teknium
parent 1efed67056
commit 3082fa0829
3 changed files with 257 additions and 6 deletions

View file

@ -52,6 +52,12 @@ _DEFAULT_LOCAL_URL = "http://localhost:8888"
_MIN_CLIENT_VERSION = "0.4.22"
_DEFAULT_TIMEOUT = 120 # seconds — cloud API can take 30-40s per request
_DEFAULT_IDLE_TIMEOUT = 300 # seconds — Hindsight embedded daemon default
# Mirrors hindsight-integrations/openclaw — Hindsight 0.5.0 added
# `update_mode='append'` semantics on retain (vectorize-io/hindsight#932).
# Without it, reusing a stable session-scoped document_id silently
# overwrites prior turns server-side, so we keep the per-process
# unique document_id fallback for older APIs.
_MIN_VERSION_FOR_UPDATE_MODE_APPEND = "0.5.0"
_VALID_BUDGETS = {"low", "mid", "high"}
_PROVIDER_DEFAULT_MODELS = {
"openai": "gpt-4o-mini",
@ -93,6 +99,95 @@ def _check_local_runtime() -> tuple[bool, str | None]:
return False, str(exc)
# ---------------------------------------------------------------------------
# Hindsight API capability probe — mirrors hindsight-integrations/openclaw.
# ---------------------------------------------------------------------------
# Cache of API_URL -> bool (whether that API supports update_mode='append').
# Probed once per URL per process — every provider talking to the same API
# gets the same answer without re-hitting /version on each initialize().
_append_capability_cache: Dict[str, bool] = {}
_append_capability_lock = threading.Lock()
def _meets_minimum_version(actual: str | None, required: str) -> bool:
"""Return True if *actual* ≥ *required* (semver). False on missing/invalid."""
if not actual:
return False
try:
from packaging.version import Version
return Version(actual) >= Version(required)
except Exception:
return False
def _fetch_hindsight_api_version(api_url: str, api_key: str | None = None,
timeout: float = 5.0) -> str | None:
"""GET ``<api_url>/version`` and return the version string (or None on failure).
Hindsight's `/version` endpoint returns ``{"version": "0.5.6", ...}``.
Any failure (timeout, 404, malformed JSON, missing key) None, which
the caller treats as "legacy API, no update_mode support".
"""
import urllib.error
import urllib.request
if not api_url:
return None
url = api_url.rstrip("/") + "/version"
req = urllib.request.Request(url)
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310
payload = resp.read().decode("utf-8", errors="replace")
data = json.loads(payload)
except Exception as exc:
logger.debug("Hindsight /version probe failed for %s: %s", url, exc)
return None
if not isinstance(data, dict):
return None
version = data.get("version") or data.get("api_version")
return str(version) if version else None
def _check_api_supports_update_mode_append(api_url: str,
api_key: str | None = None) -> bool:
"""Cached capability check for ``update_mode='append'`` on *api_url*.
Probes once per URL per process. Returns False on any probe failure
that's the safe default: a per-process unique ``document_id`` and no
``update_mode`` keeps the resume-overwrite fix (#6654) intact.
"""
if not api_url:
return False
with _append_capability_lock:
if api_url in _append_capability_cache:
return _append_capability_cache[api_url]
version = _fetch_hindsight_api_version(api_url, api_key)
supported = _meets_minimum_version(version, _MIN_VERSION_FOR_UPDATE_MODE_APPEND)
with _append_capability_lock:
# Re-check after acquiring the lock in case a concurrent probe filled it.
cached = _append_capability_cache.get(api_url)
if cached is None:
_append_capability_cache[api_url] = supported
else:
supported = cached
if not supported:
logger.warning(
"Hindsight API at %s reports version %r, older than %s. "
"Falling back to per-process document_id — retains across "
"processes/sessions create separate documents instead of "
"appending to a session-scoped one. Upgrade Hindsight to "
"%s+ to enable update_mode='append' deduplication.",
api_url, version, _MIN_VERSION_FOR_UPDATE_MODE_APPEND,
_MIN_VERSION_FOR_UPDATE_MODE_APPEND,
)
else:
logger.debug("Hindsight API %s version %s supports update_mode='append'",
api_url, version)
return supported
# ---------------------------------------------------------------------------
# Dedicated event loop for Hindsight async calls (one per process, reused).
# Avoids creating ephemeral loops that leak aiohttp sessions.
@ -918,6 +1013,40 @@ class HindsightMemoryProvider(MemoryProvider):
self._client = client
return self._run_sync(operation(client))
def _probe_url(self) -> str:
"""Return the URL to probe /version on.
For local_embedded the daemon is on a per-profile dynamic port,
so we prefer the running client's URL when available; otherwise
fall back to the configured api_url.
"""
if self._mode == "local_embedded" and self._client is not None:
url = getattr(self._client, "url", None)
if url:
return str(url)
return self._api_url or ""
def _resolve_retain_target(self, fallback_document_id: str) -> tuple[str, str | None]:
"""Pick (document_id, update_mode) based on live API capability.
On Hindsight 0.5.0 the API supports ``update_mode='append'``,
which lets us reuse a stable session-scoped ``document_id`` across
process lifecycles without overwriting prior turns. On older APIs
we fall back to *fallback_document_id* (the per-process unique
``f"{session_id}-{start_ts}"`` minted at initialize / switch time)
and don't pass ``update_mode`` at all — that's the only way the
resume-overwrite fix (#6654) keeps working on legacy servers.
Probe is cached at module level per API URL, so this is one HTTP
round-trip per (process, api_url) pair regardless of how many
retains fire.
"""
if not self._session_id:
return fallback_document_id, None
if _check_api_supports_update_mode_append(self._probe_url(), self._api_key):
return self._session_id, "append"
return fallback_document_id, None
def initialize(self, session_id: str, **kwargs) -> None:
self._session_id = str(session_id or "").strip()
self._parent_session_id = str(kwargs.get("parent_session_id", "") or "").strip()
@ -1319,7 +1448,7 @@ class HindsightMemoryProvider(MemoryProvider):
turn_index=self._turn_index,
)
num_turns = len(self._session_turns)
document_id = self._document_id
document_id, update_mode = self._resolve_retain_target(self._document_id)
bank_id = self._bank_id
retain_async_flag = self._retain_async
retain_context = self._retain_context
@ -1333,8 +1462,10 @@ class HindsightMemoryProvider(MemoryProvider):
)
item.pop("bank_id", None)
item.pop("retain_async", None)
logger.debug("Hindsight retain: bank=%s, doc=%s, async=%s, content_len=%d, num_turns=%d",
bank_id, document_id, retain_async_flag, len(content), num_turns)
if update_mode is not None:
item["update_mode"] = update_mode
logger.debug("Hindsight retain: bank=%s, doc=%s, mode=%s, async=%s, content_len=%d, num_turns=%d",
bank_id, document_id, update_mode, retain_async_flag, len(content), num_turns)
self._run_hindsight_operation(
lambda client: client.aretain_batch(
bank_id=bank_id,
@ -1471,7 +1602,6 @@ class HindsightMemoryProvider(MemoryProvider):
if self._session_turns:
old_turns = list(self._session_turns)
old_session_id = self._session_id
old_document_id = self._document_id
old_parent_session_id = self._parent_session_id
old_turn_index = self._turn_index
old_metadata = self._build_metadata(
@ -1484,6 +1614,13 @@ class HindsightMemoryProvider(MemoryProvider):
if old_parent_session_id:
old_lineage_tags.append(f"parent:{old_parent_session_id}")
old_content = "[" + ",".join(old_turns) + "]"
# Resolve doc_id + update_mode against the OLD session BEFORE
# we rotate _session_id, so the flush lands in the old
# session's document either way (legacy: per-process unique;
# ≥0.5.0: stable session-scoped + append).
old_document_id, old_update_mode = self._resolve_retain_target(
self._document_id
)
def _flush():
try:
@ -1495,9 +1632,11 @@ class HindsightMemoryProvider(MemoryProvider):
)
item.pop("bank_id", None)
item.pop("retain_async", None)
if old_update_mode is not None:
item["update_mode"] = old_update_mode
logger.debug(
"Hindsight flush-on-switch: bank=%s, doc=%s, num_turns=%d",
self._bank_id, old_document_id, len(old_turns),
"Hindsight flush-on-switch: bank=%s, doc=%s, mode=%s, num_turns=%d",
self._bank_id, old_document_id, old_update_mode, len(old_turns),
)
self._run_hindsight_operation(
lambda client: client.aretain_batch(

View file

@ -248,6 +248,14 @@ def _make_hindsight_provider():
provider._atexit_registered = True
provider._ensure_writer = lambda: None
provider._register_atexit = lambda: None
# Mode + API state used by _resolve_retain_target; stub the resolver
# so tests don't actually probe the API. Real probe behavior is
# exercised by tests in tests/plugins/memory/test_hindsight_provider.py.
provider._mode = "cloud"
provider._api_url = ""
provider._api_key = ""
provider._client = None
provider._resolve_retain_target = lambda fb: (fb, None)
# Stub the network-touching helper so any enqueued flush closure is
# a no-op if ever drained in a unit test.
provider._run_hindsight_operation = lambda _op: None

View file

@ -1072,6 +1072,110 @@ class TestSessionSwitchBufferFlush:
assert call_order[1] == "3"
# ---------------------------------------------------------------------------
# update_mode='append' capability probe + retain dispatch
# ---------------------------------------------------------------------------
class TestUpdateModeAppendCapability:
def _clear_capability_cache(self):
from plugins.memory.hindsight import _append_capability_cache, _append_capability_lock
with _append_capability_lock:
_append_capability_cache.clear()
def test_legacy_api_falls_back_to_per_process_doc_id(self, provider, monkeypatch):
"""API returns no /version (or pre-0.5.0) — sync_turn must use the
per-process unique doc_id and NOT pass update_mode."""
self._clear_capability_cache()
monkeypatch.setattr(
"plugins.memory.hindsight._fetch_hindsight_api_version",
lambda *a, **kw: None,
)
old_doc = provider._document_id
provider.sync_turn("hello", "hi")
provider._retain_queue.join()
kw = provider._client.aretain_batch.call_args.kwargs
assert kw["document_id"] == old_doc
assert kw["document_id"].startswith("test-session-")
item = kw["items"][0]
assert "update_mode" not in item
def test_modern_api_uses_stable_doc_id_with_append(self, provider, monkeypatch):
"""API on >=0.5.0 — retain uses stable session_id and sets update_mode='append'."""
self._clear_capability_cache()
monkeypatch.setattr(
"plugins.memory.hindsight._fetch_hindsight_api_version",
lambda *a, **kw: "0.5.6",
)
provider.sync_turn("hello", "hi")
provider._retain_queue.join()
kw = provider._client.aretain_batch.call_args.kwargs
# Stable: just the session id, no per-process timestamp suffix.
assert kw["document_id"] == "test-session"
item = kw["items"][0]
assert item["update_mode"] == "append"
def test_capability_cached_per_url(self, provider, monkeypatch):
"""The /version probe must run at most once per (process, api_url)."""
self._clear_capability_cache()
calls = {"n": 0}
def _spy(*a, **kw):
calls["n"] += 1
return "0.5.6"
monkeypatch.setattr(
"plugins.memory.hindsight._fetch_hindsight_api_version", _spy
)
provider.sync_turn("a", "b")
provider._retain_queue.join()
provider.sync_turn("c", "d")
provider._retain_queue.join()
assert calls["n"] == 1
def test_legacy_warning_emitted_once(self, provider, monkeypatch, caplog):
"""One-time WARN nudges users to upgrade Hindsight."""
import logging
self._clear_capability_cache()
monkeypatch.setattr(
"plugins.memory.hindsight._fetch_hindsight_api_version",
lambda *a, **kw: "0.4.22",
)
with caplog.at_level(logging.WARNING, logger="plugins.memory.hindsight"):
provider.sync_turn("a", "b")
provider._retain_queue.join()
provider.sync_turn("c", "d")
provider._retain_queue.join()
warns = [r for r in caplog.records
if r.levelno == logging.WARNING
and "older than 0.5.0" in r.getMessage()]
# Cache hit on the second call → no second warn.
assert len(warns) == 1
def test_session_switch_flush_picks_capability_against_old_session(
self, provider_with_config, monkeypatch
):
"""When the API supports append, the flush on /reset must land
in the OLD session's stable document, not a per-process id."""
self._clear_capability_cache()
monkeypatch.setattr(
"plugins.memory.hindsight._fetch_hindsight_api_version",
lambda *a, **kw: "0.5.6",
)
p = provider_with_config(retain_every_n_turns=3, retain_async=False)
p.sync_turn("turn1-user", "turn1-asst")
p.sync_turn("turn2-user", "turn2-asst")
p.on_session_switch("new-sid", parent_session_id="test-session", reset=True)
p._retain_queue.join()
kw = p._client.aretain_batch.call_args.kwargs
# Flush goes to the OLD session's stable doc, not new-sid's.
assert kw["document_id"] == "test-session"
assert kw["items"][0]["update_mode"] == "append"
# ---------------------------------------------------------------------------
# System prompt tests
# ---------------------------------------------------------------------------