mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(honcho): buffer partial memory-context spans across stream deltas
sanitize_context() uses a non-greedy block regex that needs both <memory-context> open and close tags present in a single string. When a provider streams the fenced memory block across multiple deltas (typical for recalled-context leaks — the payload often arrives in 10+ 1-80 char chunks), the per-delta sanitize stripped the lone open/close tags via _FENCE_TAG_RE but let the payload in between flow straight to the UI. Adds StreamingContextScrubber: a small stateful scrubber that tracks open/close tag pairs across deltas, holds back partial-tag tails at chunk boundaries, and discards span contents wholesale (including the system-note line that fragments across deltas). Wired into _fire_stream_delta; reset per user turn; benign trailing partial-tag tails are flushed at the end of each model call. Mid-span interruption (provider drops closing tag) drops the orphaned content rather than leaking it — truncated answer > leaked memory. Follow-up to #13672 (@dontcallmejames).
This commit is contained in:
parent
5d349ea857
commit
5ce5b17a42
2 changed files with 148 additions and 2 deletions
|
|
@ -63,6 +63,117 @@ def sanitize_context(text: str) -> str:
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
class StreamingContextScrubber:
|
||||||
|
"""Stateful scrubber for streaming text that may contain split memory-context spans.
|
||||||
|
|
||||||
|
The one-shot ``sanitize_context`` regex cannot survive chunk boundaries:
|
||||||
|
a ``<memory-context>`` opened in one delta and closed in a later delta
|
||||||
|
leaks its payload to the UI because the non-greedy block regex needs
|
||||||
|
both tags in one string. This scrubber runs a small state machine
|
||||||
|
across deltas, holding back partial-tag tails and discarding
|
||||||
|
everything inside a span (including the system-note line).
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
scrubber = StreamingContextScrubber()
|
||||||
|
for delta in stream:
|
||||||
|
visible = scrubber.feed(delta)
|
||||||
|
if visible:
|
||||||
|
emit(visible)
|
||||||
|
trailing = scrubber.flush() # at end of stream
|
||||||
|
if trailing:
|
||||||
|
emit(trailing)
|
||||||
|
|
||||||
|
The scrubber is re-entrant per agent instance. Callers building new
|
||||||
|
top-level responses (new turn) should create a fresh scrubber or call
|
||||||
|
``reset()``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_OPEN_TAG = "<memory-context>"
|
||||||
|
_CLOSE_TAG = "</memory-context>"
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._in_span: bool = False
|
||||||
|
self._buf: str = ""
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._in_span = False
|
||||||
|
self._buf = ""
|
||||||
|
|
||||||
|
def feed(self, text: str) -> str:
|
||||||
|
"""Return the visible portion of ``text`` after scrubbing.
|
||||||
|
|
||||||
|
Any trailing fragment that could be the start of an open/close tag
|
||||||
|
is held back in the internal buffer and surfaced on the next
|
||||||
|
``feed()`` call or discarded/emitted by ``flush()``.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return ""
|
||||||
|
buf = self._buf + text
|
||||||
|
self._buf = ""
|
||||||
|
out: list[str] = []
|
||||||
|
|
||||||
|
while buf:
|
||||||
|
if self._in_span:
|
||||||
|
idx = buf.lower().find(self._CLOSE_TAG)
|
||||||
|
if idx == -1:
|
||||||
|
# Hold back a potential partial close tag; drop the rest
|
||||||
|
held = self._max_partial_suffix(buf, self._CLOSE_TAG)
|
||||||
|
self._buf = buf[-held:] if held else ""
|
||||||
|
return "".join(out)
|
||||||
|
# Found close — skip span content + tag, continue
|
||||||
|
buf = buf[idx + len(self._CLOSE_TAG):]
|
||||||
|
self._in_span = False
|
||||||
|
else:
|
||||||
|
idx = buf.lower().find(self._OPEN_TAG)
|
||||||
|
if idx == -1:
|
||||||
|
# No open tag — hold back a potential partial open tag
|
||||||
|
held = self._max_partial_suffix(buf, self._OPEN_TAG)
|
||||||
|
if held:
|
||||||
|
out.append(buf[:-held])
|
||||||
|
self._buf = buf[-held:]
|
||||||
|
else:
|
||||||
|
out.append(buf)
|
||||||
|
return "".join(out)
|
||||||
|
# Emit text before the tag, enter span
|
||||||
|
if idx > 0:
|
||||||
|
out.append(buf[:idx])
|
||||||
|
buf = buf[idx + len(self._OPEN_TAG):]
|
||||||
|
self._in_span = True
|
||||||
|
|
||||||
|
return "".join(out)
|
||||||
|
|
||||||
|
def flush(self) -> str:
|
||||||
|
"""Emit any held-back buffer at end-of-stream.
|
||||||
|
|
||||||
|
If we're still inside an unterminated span the remaining content is
|
||||||
|
discarded (safer: leaking partial memory context is worse than a
|
||||||
|
truncated answer). Otherwise the held-back partial-tag tail is
|
||||||
|
emitted verbatim (it turned out not to be a real tag).
|
||||||
|
"""
|
||||||
|
if self._in_span:
|
||||||
|
self._buf = ""
|
||||||
|
self._in_span = False
|
||||||
|
return ""
|
||||||
|
tail = self._buf
|
||||||
|
self._buf = ""
|
||||||
|
return tail
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _max_partial_suffix(buf: str, tag: str) -> int:
|
||||||
|
"""Return the length of the longest buf-suffix that is a tag-prefix.
|
||||||
|
|
||||||
|
Case-insensitive. Returns 0 if no suffix could start the tag.
|
||||||
|
"""
|
||||||
|
tag_lower = tag.lower()
|
||||||
|
buf_lower = buf.lower()
|
||||||
|
max_check = min(len(buf_lower), len(tag_lower) - 1)
|
||||||
|
for i in range(max_check, 0, -1):
|
||||||
|
if tag_lower.startswith(buf_lower[-i:]):
|
||||||
|
return i
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def build_memory_context_block(raw_context: str) -> str:
|
def build_memory_context_block(raw_context: str) -> str:
|
||||||
"""Wrap prefetched memory in a fenced block with system note.
|
"""Wrap prefetched memory in a fenced block with system note.
|
||||||
|
|
||||||
|
|
|
||||||
39
run_agent.py
39
run_agent.py
|
|
@ -86,7 +86,7 @@ from tools.browser_tool import cleanup_browser
|
||||||
|
|
||||||
|
|
||||||
# Agent internals extracted to agent/ package for modularity
|
# Agent internals extracted to agent/ package for modularity
|
||||||
from agent.memory_manager import build_memory_context_block, sanitize_context
|
from agent.memory_manager import StreamingContextScrubber, build_memory_context_block, sanitize_context
|
||||||
from agent.retry_utils import jittered_backoff
|
from agent.retry_utils import jittered_backoff
|
||||||
from agent.error_classifier import classify_api_error, FailoverReason
|
from agent.error_classifier import classify_api_error, FailoverReason
|
||||||
from agent.prompt_builder import (
|
from agent.prompt_builder import (
|
||||||
|
|
@ -1218,6 +1218,10 @@ class AIAgent:
|
||||||
# Deferred paragraph break flag — set after tool iterations so a
|
# Deferred paragraph break flag — set after tool iterations so a
|
||||||
# single "\n\n" is prepended to the next real text delta.
|
# single "\n\n" is prepended to the next real text delta.
|
||||||
self._stream_needs_break = False
|
self._stream_needs_break = False
|
||||||
|
# Stateful scrubber for <memory-context> spans split across stream
|
||||||
|
# deltas (#5719). sanitize_context() alone can't survive chunk
|
||||||
|
# boundaries because the block regex needs both tags in one string.
|
||||||
|
self._stream_context_scrubber = StreamingContextScrubber()
|
||||||
# Visible assistant text already delivered through live token callbacks
|
# Visible assistant text already delivered through live token callbacks
|
||||||
# during the current model response. Used to avoid re-sending the same
|
# during the current model response. Used to avoid re-sending the same
|
||||||
# commentary when the provider later returns it as a completed interim
|
# commentary when the provider later returns it as a completed interim
|
||||||
|
|
@ -6019,6 +6023,20 @@ class AIAgent:
|
||||||
|
|
||||||
def _reset_stream_delivery_tracking(self) -> None:
|
def _reset_stream_delivery_tracking(self) -> None:
|
||||||
"""Reset tracking for text delivered during the current model response."""
|
"""Reset tracking for text delivered during the current model response."""
|
||||||
|
# Flush any benign partial-tag tail held by the context scrubber so it
|
||||||
|
# reaches the UI before we clear state for the next model call. If
|
||||||
|
# the scrubber is mid-span, flush() drops the orphaned content.
|
||||||
|
scrubber = getattr(self, "_stream_context_scrubber", None)
|
||||||
|
if scrubber is not None:
|
||||||
|
tail = scrubber.flush()
|
||||||
|
if tail:
|
||||||
|
callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None]
|
||||||
|
for cb in callbacks:
|
||||||
|
try:
|
||||||
|
cb(tail)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._record_streamed_assistant_text(tail)
|
||||||
self._current_streamed_assistant_text = ""
|
self._current_streamed_assistant_text = ""
|
||||||
|
|
||||||
def _record_streamed_assistant_text(self, text: str) -> None:
|
def _record_streamed_assistant_text(self, text: str) -> None:
|
||||||
|
|
@ -6073,7 +6091,17 @@ class AIAgent:
|
||||||
else:
|
else:
|
||||||
prepended_break = False
|
prepended_break = False
|
||||||
if isinstance(text, str):
|
if isinstance(text, str):
|
||||||
text = sanitize_context(self._strip_think_blocks(text or ""))
|
# Strip <think> blocks first (per-delta is safe for closed pairs; the
|
||||||
|
# unterminated-tag path is handled downstream by stream_consumer).
|
||||||
|
# Then feed through the stateful context scrubber so memory-context
|
||||||
|
# spans split across chunks cannot leak to the UI (#5719).
|
||||||
|
text = self._strip_think_blocks(text or "")
|
||||||
|
scrubber = getattr(self, "_stream_context_scrubber", None)
|
||||||
|
if scrubber is not None:
|
||||||
|
text = scrubber.feed(text)
|
||||||
|
else:
|
||||||
|
# Defensive: legacy callers without the scrubber attribute.
|
||||||
|
text = sanitize_context(text)
|
||||||
if not prepended_break:
|
if not prepended_break:
|
||||||
text = text.lstrip("\n")
|
text = text.lstrip("\n")
|
||||||
if not text:
|
if not text:
|
||||||
|
|
@ -9689,6 +9717,13 @@ class AIAgent:
|
||||||
# Track user turns for memory flush and periodic nudge logic
|
# Track user turns for memory flush and periodic nudge logic
|
||||||
self._user_turn_count += 1
|
self._user_turn_count += 1
|
||||||
|
|
||||||
|
# Reset the streaming context scrubber at the top of each turn so a
|
||||||
|
# hung span from a prior interrupted stream can't taint this turn's
|
||||||
|
# output.
|
||||||
|
scrubber = getattr(self, "_stream_context_scrubber", None)
|
||||||
|
if scrubber is not None:
|
||||||
|
scrubber.reset()
|
||||||
|
|
||||||
# Preserve the original user message (no nudge injection).
|
# Preserve the original user message (no nudge injection).
|
||||||
original_user_message = persist_user_message if persist_user_message is not None else user_message
|
original_user_message = persist_user_message if persist_user_message is not None else user_message
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue