fix(honcho): buffer partial memory-context spans across stream deltas

sanitize_context() uses a non-greedy block regex that needs both
<memory-context> open and close tags present in a single string. When a
provider streams the fenced memory block across multiple deltas (typical
for recalled-context leaks — the payload often arrives in 10+ 1-80 char
chunks), the per-delta sanitize stripped the lone open/close tags via
_FENCE_TAG_RE but let the payload in between flow straight to the UI.

Adds StreamingContextScrubber: a small stateful scrubber that tracks
open/close tag pairs across deltas, holds back partial-tag tails at
chunk boundaries, and discards span contents wholesale (including the
system-note line that fragments across deltas).

Wired into _fire_stream_delta; reset per user turn; benign trailing
partial-tag tails are flushed at the end of each model call.  Mid-span
interruption (provider drops closing tag) drops the orphaned content
rather than leaking it — truncated answer > leaked memory.

Follow-up to #13672 (@dontcallmejames).
This commit is contained in:
Erosika 2026-04-24 18:29:50 -04:00
parent 5bc2fbe9f2
commit 6fe522cabd
2 changed files with 148 additions and 2 deletions

View file

@ -63,6 +63,117 @@ def sanitize_context(text: str) -> str:
return text
class StreamingContextScrubber:
"""Stateful scrubber for streaming text that may contain split memory-context spans.
The one-shot ``sanitize_context`` regex cannot survive chunk boundaries:
a ``<memory-context>`` opened in one delta and closed in a later delta
leaks its payload to the UI because the non-greedy block regex needs
both tags in one string. This scrubber runs a small state machine
across deltas, holding back partial-tag tails and discarding
everything inside a span (including the system-note line).
Usage::
scrubber = StreamingContextScrubber()
for delta in stream:
visible = scrubber.feed(delta)
if visible:
emit(visible)
trailing = scrubber.flush() # at end of stream
if trailing:
emit(trailing)
The scrubber is re-entrant per agent instance. Callers building new
top-level responses (new turn) should create a fresh scrubber or call
``reset()``.
"""
_OPEN_TAG = "<memory-context>"
_CLOSE_TAG = "</memory-context>"
def __init__(self) -> None:
self._in_span: bool = False
self._buf: str = ""
def reset(self) -> None:
self._in_span = False
self._buf = ""
def feed(self, text: str) -> str:
"""Return the visible portion of ``text`` after scrubbing.
Any trailing fragment that could be the start of an open/close tag
is held back in the internal buffer and surfaced on the next
``feed()`` call or discarded/emitted by ``flush()``.
"""
if not text:
return ""
buf = self._buf + text
self._buf = ""
out: list[str] = []
while buf:
if self._in_span:
idx = buf.lower().find(self._CLOSE_TAG)
if idx == -1:
# Hold back a potential partial close tag; drop the rest
held = self._max_partial_suffix(buf, self._CLOSE_TAG)
self._buf = buf[-held:] if held else ""
return "".join(out)
# Found close — skip span content + tag, continue
buf = buf[idx + len(self._CLOSE_TAG):]
self._in_span = False
else:
idx = buf.lower().find(self._OPEN_TAG)
if idx == -1:
# No open tag — hold back a potential partial open tag
held = self._max_partial_suffix(buf, self._OPEN_TAG)
if held:
out.append(buf[:-held])
self._buf = buf[-held:]
else:
out.append(buf)
return "".join(out)
# Emit text before the tag, enter span
if idx > 0:
out.append(buf[:idx])
buf = buf[idx + len(self._OPEN_TAG):]
self._in_span = True
return "".join(out)
def flush(self) -> str:
"""Emit any held-back buffer at end-of-stream.
If we're still inside an unterminated span the remaining content is
discarded (safer: leaking partial memory context is worse than a
truncated answer). Otherwise the held-back partial-tag tail is
emitted verbatim (it turned out not to be a real tag).
"""
if self._in_span:
self._buf = ""
self._in_span = False
return ""
tail = self._buf
self._buf = ""
return tail
@staticmethod
def _max_partial_suffix(buf: str, tag: str) -> int:
"""Return the length of the longest buf-suffix that is a tag-prefix.
Case-insensitive. Returns 0 if no suffix could start the tag.
"""
tag_lower = tag.lower()
buf_lower = buf.lower()
max_check = min(len(buf_lower), len(tag_lower) - 1)
for i in range(max_check, 0, -1):
if tag_lower.startswith(buf_lower[-i:]):
return i
return 0
def build_memory_context_block(raw_context: str) -> str:
"""Wrap prefetched memory in a fenced block with system note.

View file

@ -79,7 +79,7 @@ from tools.browser_tool import cleanup_browser
# Agent internals extracted to agent/ package for modularity
from agent.memory_manager import build_memory_context_block, sanitize_context
from agent.memory_manager import StreamingContextScrubber, build_memory_context_block, sanitize_context
from agent.retry_utils import jittered_backoff
from agent.error_classifier import classify_api_error, FailoverReason
from agent.prompt_builder import (
@ -1208,6 +1208,10 @@ class AIAgent:
# Deferred paragraph break flag — set after tool iterations so a
# single "\n\n" is prepended to the next real text delta.
self._stream_needs_break = False
# Stateful scrubber for <memory-context> spans split across stream
# deltas (#5719). sanitize_context() alone can't survive chunk
# boundaries because the block regex needs both tags in one string.
self._stream_context_scrubber = StreamingContextScrubber()
# Visible assistant text already delivered through live token callbacks
# during the current model response. Used to avoid re-sending the same
# commentary when the provider later returns it as a completed interim
@ -5784,6 +5788,20 @@ class AIAgent:
def _reset_stream_delivery_tracking(self) -> None:
"""Reset tracking for text delivered during the current model response."""
# Flush any benign partial-tag tail held by the context scrubber so it
# reaches the UI before we clear state for the next model call. If
# the scrubber is mid-span, flush() drops the orphaned content.
scrubber = getattr(self, "_stream_context_scrubber", None)
if scrubber is not None:
tail = scrubber.flush()
if tail:
callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None]
for cb in callbacks:
try:
cb(tail)
except Exception:
pass
self._record_streamed_assistant_text(tail)
self._current_streamed_assistant_text = ""
def _record_streamed_assistant_text(self, text: str) -> None:
@ -5838,7 +5856,17 @@ class AIAgent:
else:
prepended_break = False
if isinstance(text, str):
text = sanitize_context(self._strip_think_blocks(text or ""))
# Strip <think> blocks first (per-delta is safe for closed pairs; the
# unterminated-tag path is handled downstream by stream_consumer).
# Then feed through the stateful context scrubber so memory-context
# spans split across chunks cannot leak to the UI (#5719).
text = self._strip_think_blocks(text or "")
scrubber = getattr(self, "_stream_context_scrubber", None)
if scrubber is not None:
text = scrubber.feed(text)
else:
# Defensive: legacy callers without the scrubber attribute.
text = sanitize_context(text)
if not prepended_break:
text = text.lstrip("\n")
if not text:
@ -9387,6 +9415,13 @@ class AIAgent:
# Track user turns for memory flush and periodic nudge logic
self._user_turn_count += 1
# Reset the streaming context scrubber at the top of each turn so a
# hung span from a prior interrupted stream can't taint this turn's
# output.
scrubber = getattr(self, "_stream_context_scrubber", None)
if scrubber is not None:
scrubber.reset()
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message