diff --git a/plugins/memory/hindsight/__init__.py b/plugins/memory/hindsight/__init__.py index 2f94c08da38..53f422b2d7c 100644 --- a/plugins/memory/hindsight/__init__.py +++ b/plugins/memory/hindsight/__init__.py @@ -575,6 +575,10 @@ class HindsightMemoryProvider(MemoryProvider): self._retain_context = "conversation between Hermes Agent and the User" self._turn_counter = 0 self._session_turns: list[str] = [] # accumulates ALL turns for the session + # How many turns the last append-mode retain already shipped. Used to + # send only the new delta on subsequent retains when the API supports + # update_mode='append' (legacy/overwrite path still sends everything). + self._last_retained_turn_count = 0 # Recall controls self._auto_recall = True @@ -1119,6 +1123,7 @@ class HindsightMemoryProvider(MemoryProvider): self._agent_workspace = str(kwargs.get("agent_workspace") or "").strip() self._turn_index = 0 self._session_turns = [] + self._last_retained_turn_count = 0 self._mode = self._config.get("mode", "cloud") # Read timeout from config or env var, fall back to default self._timeout = _parse_int_setting( @@ -1461,9 +1466,24 @@ class HindsightMemoryProvider(MemoryProvider): self._turn_counter, self._turn_counter + (self._retain_every_n_turns - self._turn_counter % self._retain_every_n_turns)) return - logger.debug("sync_turn: retaining %d turns, total session content %d chars", - len(self._session_turns), sum(len(t) for t in self._session_turns)) - content = "[" + ",".join(self._session_turns) + "]" + document_id, update_mode = self._resolve_retain_target(self._document_id) + + # On append-capable APIs each retain only needs to ship the turns + # accumulated since the last retain — the server appends them to the + # existing document. On legacy/overwrite APIs we must resend the whole + # session because each retain replaces the document. + if update_mode == "append": + turns_to_retain = self._session_turns[self._last_retained_turn_count:] + if not turns_to_retain: + logger.debug("sync_turn: skipped append retain; no new turns since last retain") + return + else: + turns_to_retain = list(self._session_turns) + + logger.debug("sync_turn: retaining %d/%d turns, payload %d chars", + len(turns_to_retain), len(self._session_turns), + sum(len(t) for t in turns_to_retain)) + content = "[" + ",".join(turns_to_retain) + "]" lineage_tags: list[str] = [] if self._session_id: @@ -1474,11 +1494,10 @@ class HindsightMemoryProvider(MemoryProvider): # Snapshot the state needed for the retain. The writer may run after # _session_turns / _turn_index are mutated by a later sync_turn(). metadata_snapshot = self._build_metadata( - message_count=len(self._session_turns) * 2, + message_count=len(turns_to_retain) * 2, turn_index=self._turn_index, ) - num_turns = len(self._session_turns) - document_id, update_mode = self._resolve_retain_target(self._document_id) + num_turns = len(turns_to_retain) bank_id = self._bank_id retain_async_flag = self._retain_async retain_context = self._retain_context @@ -1509,6 +1528,10 @@ class HindsightMemoryProvider(MemoryProvider): self._ensure_writer() self._register_atexit() self._retain_queue.put(_do_retain) + # Advance the append watermark only after the delta is queued, so a + # later retain doesn't re-ship turns we've already handed to the writer. + if update_mode == "append": + self._last_retained_turn_count = len(self._session_turns) def get_tool_schemas(self) -> List[Dict[str, Any]]: if self._memory_mode == "context": @@ -1706,6 +1729,7 @@ class HindsightMemoryProvider(MemoryProvider): self._session_turns = [] self._turn_counter = 0 self._turn_index = 0 + self._last_retained_turn_count = 0 logger.debug( "Hindsight on_session_switch: new_session=%s parent=%s reset=%s doc=%s", self._session_id, self._parent_session_id, reset, self._document_id, diff --git a/tests/plugins/memory/test_hindsight_provider.py b/tests/plugins/memory/test_hindsight_provider.py index f49c227611a..a7ca66f73f4 100644 --- a/tests/plugins/memory/test_hindsight_provider.py +++ b/tests/plugins/memory/test_hindsight_provider.py @@ -780,8 +780,8 @@ class TestSyncTurn: assert item["metadata"]["turn_index"] == "3" assert item["metadata"]["message_count"] == "6" - def test_sync_turn_accumulates_full_session(self, provider_with_config): - """Each retain sends the ENTIRE session, not just the latest batch.""" + def test_sync_turn_accumulates_full_session_without_append_support(self, provider_with_config): + """Legacy/overwrite APIs (no update_mode=append) resend the ENTIRE session each retain.""" p = provider_with_config(retain_every_n_turns=2) p.sync_turn("turn1-user", "turn1-asst") @@ -795,12 +795,59 @@ class TestSyncTurn: p._retain_queue.join() content = p._client.aretain_batch.call_args.kwargs["items"][0]["content"] - # Should contain ALL turns from the session + # Without append support the document is overwritten, so it must + # contain ALL turns from the session. assert "turn1-user" in content assert "turn2-user" in content assert "turn3-user" in content assert "turn4-user" in content + def test_sync_turn_appends_only_delta_when_append_supported(self, provider_with_config, monkeypatch): + """On append-capable APIs each retain ships only the new turns, not the whole session.""" + monkeypatch.setattr( + "plugins.memory.hindsight._fetch_hindsight_api_version", + lambda *a, **kw: "0.5.6", + ) + from plugins.memory.hindsight import _append_capability_cache, _append_capability_lock + # Clear before AND after: the capability cache is module-global and keyed + # per api_url, so a stale entry would leak into other tests. + with _append_capability_lock: + _append_capability_cache.clear() + try: + p = provider_with_config(retain_every_n_turns=2) + + p.sync_turn("turn1-user", "turn1-asst") + p.sync_turn("turn2-user", "turn2-asst") + p._retain_queue.join() + + first = p._client.aretain_batch.call_args.kwargs + first_item = first["items"][0] + assert first["document_id"] == "test-session" + assert first_item["update_mode"] == "append" + assert "turn1-user" in first_item["content"] + assert "turn2-user" in first_item["content"] + + p._client.aretain_batch.reset_mock() + + p.sync_turn("turn3-user", "turn3-asst") + p.sync_turn("turn4-user", "turn4-asst") + p._retain_queue.join() + + second = p._client.aretain_batch.call_args.kwargs + second_item = second["items"][0] + assert second["document_id"] == "test-session" + assert second_item["update_mode"] == "append" + # Only the delta — the already-retained turns must NOT be resent. + assert "turn1-user" not in second_item["content"] + assert "turn2-user" not in second_item["content"] + assert "turn3-user" in second_item["content"] + assert "turn4-user" in second_item["content"] + # message_count reflects only the delta (2 turns -> 4 messages). + assert second_item["metadata"]["message_count"] == "4" + finally: + with _append_capability_lock: + _append_capability_cache.clear() + def test_sync_turn_passes_document_id(self, provider): """sync_turn should pass document_id (session_id + per-startup ts).""" provider.sync_turn("hello", "hi")