mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(hindsight): send only new-turn delta on append retains instead of whole session (#40605)
Closes #40503. Salvaged from #40519; re-verified on main, tightened, tested. Co-authored-by: skylarbpayne <skylarbpayne@users.noreply.github.com>
This commit is contained in:
parent
dde9c0d19d
commit
09d66037f8
2 changed files with 80 additions and 9 deletions
|
|
@ -575,6 +575,10 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
self._retain_context = "conversation between Hermes Agent and the User"
|
||||
self._turn_counter = 0
|
||||
self._session_turns: list[str] = [] # accumulates ALL turns for the session
|
||||
# How many turns the last append-mode retain already shipped. Used to
|
||||
# send only the new delta on subsequent retains when the API supports
|
||||
# update_mode='append' (legacy/overwrite path still sends everything).
|
||||
self._last_retained_turn_count = 0
|
||||
|
||||
# Recall controls
|
||||
self._auto_recall = True
|
||||
|
|
@ -1119,6 +1123,7 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
self._agent_workspace = str(kwargs.get("agent_workspace") or "").strip()
|
||||
self._turn_index = 0
|
||||
self._session_turns = []
|
||||
self._last_retained_turn_count = 0
|
||||
self._mode = self._config.get("mode", "cloud")
|
||||
# Read timeout from config or env var, fall back to default
|
||||
self._timeout = _parse_int_setting(
|
||||
|
|
@ -1461,9 +1466,24 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
self._turn_counter, self._turn_counter + (self._retain_every_n_turns - self._turn_counter % self._retain_every_n_turns))
|
||||
return
|
||||
|
||||
logger.debug("sync_turn: retaining %d turns, total session content %d chars",
|
||||
len(self._session_turns), sum(len(t) for t in self._session_turns))
|
||||
content = "[" + ",".join(self._session_turns) + "]"
|
||||
document_id, update_mode = self._resolve_retain_target(self._document_id)
|
||||
|
||||
# On append-capable APIs each retain only needs to ship the turns
|
||||
# accumulated since the last retain — the server appends them to the
|
||||
# existing document. On legacy/overwrite APIs we must resend the whole
|
||||
# session because each retain replaces the document.
|
||||
if update_mode == "append":
|
||||
turns_to_retain = self._session_turns[self._last_retained_turn_count:]
|
||||
if not turns_to_retain:
|
||||
logger.debug("sync_turn: skipped append retain; no new turns since last retain")
|
||||
return
|
||||
else:
|
||||
turns_to_retain = list(self._session_turns)
|
||||
|
||||
logger.debug("sync_turn: retaining %d/%d turns, payload %d chars",
|
||||
len(turns_to_retain), len(self._session_turns),
|
||||
sum(len(t) for t in turns_to_retain))
|
||||
content = "[" + ",".join(turns_to_retain) + "]"
|
||||
|
||||
lineage_tags: list[str] = []
|
||||
if self._session_id:
|
||||
|
|
@ -1474,11 +1494,10 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
# Snapshot the state needed for the retain. The writer may run after
|
||||
# _session_turns / _turn_index are mutated by a later sync_turn().
|
||||
metadata_snapshot = self._build_metadata(
|
||||
message_count=len(self._session_turns) * 2,
|
||||
message_count=len(turns_to_retain) * 2,
|
||||
turn_index=self._turn_index,
|
||||
)
|
||||
num_turns = len(self._session_turns)
|
||||
document_id, update_mode = self._resolve_retain_target(self._document_id)
|
||||
num_turns = len(turns_to_retain)
|
||||
bank_id = self._bank_id
|
||||
retain_async_flag = self._retain_async
|
||||
retain_context = self._retain_context
|
||||
|
|
@ -1509,6 +1528,10 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
self._ensure_writer()
|
||||
self._register_atexit()
|
||||
self._retain_queue.put(_do_retain)
|
||||
# Advance the append watermark only after the delta is queued, so a
|
||||
# later retain doesn't re-ship turns we've already handed to the writer.
|
||||
if update_mode == "append":
|
||||
self._last_retained_turn_count = len(self._session_turns)
|
||||
|
||||
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
||||
if self._memory_mode == "context":
|
||||
|
|
@ -1706,6 +1729,7 @@ class HindsightMemoryProvider(MemoryProvider):
|
|||
self._session_turns = []
|
||||
self._turn_counter = 0
|
||||
self._turn_index = 0
|
||||
self._last_retained_turn_count = 0
|
||||
logger.debug(
|
||||
"Hindsight on_session_switch: new_session=%s parent=%s reset=%s doc=%s",
|
||||
self._session_id, self._parent_session_id, reset, self._document_id,
|
||||
|
|
|
|||
|
|
@ -780,8 +780,8 @@ class TestSyncTurn:
|
|||
assert item["metadata"]["turn_index"] == "3"
|
||||
assert item["metadata"]["message_count"] == "6"
|
||||
|
||||
def test_sync_turn_accumulates_full_session(self, provider_with_config):
|
||||
"""Each retain sends the ENTIRE session, not just the latest batch."""
|
||||
def test_sync_turn_accumulates_full_session_without_append_support(self, provider_with_config):
|
||||
"""Legacy/overwrite APIs (no update_mode=append) resend the ENTIRE session each retain."""
|
||||
p = provider_with_config(retain_every_n_turns=2)
|
||||
|
||||
p.sync_turn("turn1-user", "turn1-asst")
|
||||
|
|
@ -795,12 +795,59 @@ class TestSyncTurn:
|
|||
p._retain_queue.join()
|
||||
|
||||
content = p._client.aretain_batch.call_args.kwargs["items"][0]["content"]
|
||||
# Should contain ALL turns from the session
|
||||
# Without append support the document is overwritten, so it must
|
||||
# contain ALL turns from the session.
|
||||
assert "turn1-user" in content
|
||||
assert "turn2-user" in content
|
||||
assert "turn3-user" in content
|
||||
assert "turn4-user" in content
|
||||
|
||||
def test_sync_turn_appends_only_delta_when_append_supported(self, provider_with_config, monkeypatch):
|
||||
"""On append-capable APIs each retain ships only the new turns, not the whole session."""
|
||||
monkeypatch.setattr(
|
||||
"plugins.memory.hindsight._fetch_hindsight_api_version",
|
||||
lambda *a, **kw: "0.5.6",
|
||||
)
|
||||
from plugins.memory.hindsight import _append_capability_cache, _append_capability_lock
|
||||
# Clear before AND after: the capability cache is module-global and keyed
|
||||
# per api_url, so a stale entry would leak into other tests.
|
||||
with _append_capability_lock:
|
||||
_append_capability_cache.clear()
|
||||
try:
|
||||
p = provider_with_config(retain_every_n_turns=2)
|
||||
|
||||
p.sync_turn("turn1-user", "turn1-asst")
|
||||
p.sync_turn("turn2-user", "turn2-asst")
|
||||
p._retain_queue.join()
|
||||
|
||||
first = p._client.aretain_batch.call_args.kwargs
|
||||
first_item = first["items"][0]
|
||||
assert first["document_id"] == "test-session"
|
||||
assert first_item["update_mode"] == "append"
|
||||
assert "turn1-user" in first_item["content"]
|
||||
assert "turn2-user" in first_item["content"]
|
||||
|
||||
p._client.aretain_batch.reset_mock()
|
||||
|
||||
p.sync_turn("turn3-user", "turn3-asst")
|
||||
p.sync_turn("turn4-user", "turn4-asst")
|
||||
p._retain_queue.join()
|
||||
|
||||
second = p._client.aretain_batch.call_args.kwargs
|
||||
second_item = second["items"][0]
|
||||
assert second["document_id"] == "test-session"
|
||||
assert second_item["update_mode"] == "append"
|
||||
# Only the delta — the already-retained turns must NOT be resent.
|
||||
assert "turn1-user" not in second_item["content"]
|
||||
assert "turn2-user" not in second_item["content"]
|
||||
assert "turn3-user" in second_item["content"]
|
||||
assert "turn4-user" in second_item["content"]
|
||||
# message_count reflects only the delta (2 turns -> 4 messages).
|
||||
assert second_item["metadata"]["message_count"] == "4"
|
||||
finally:
|
||||
with _append_capability_lock:
|
||||
_append_capability_cache.clear()
|
||||
|
||||
def test_sync_turn_passes_document_id(self, provider):
|
||||
"""sync_turn should pass document_id (session_id + per-startup ts)."""
|
||||
provider.sync_turn("hello", "hi")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue