diff --git a/agent/memory_manager.py b/agent/memory_manager.py index 62cbd6ae1a..953f41b3c4 100644 --- a/agent/memory_manager.py +++ b/agent/memory_manager.py @@ -63,6 +63,117 @@ def sanitize_context(text: str) -> str: return text +class StreamingContextScrubber: + """Stateful scrubber for streaming text that may contain split memory-context spans. + + The one-shot ``sanitize_context`` regex cannot survive chunk boundaries: + a ```` opened in one delta and closed in a later delta + leaks its payload to the UI because the non-greedy block regex needs + both tags in one string. This scrubber runs a small state machine + across deltas, holding back partial-tag tails and discarding + everything inside a span (including the system-note line). + + Usage:: + + scrubber = StreamingContextScrubber() + for delta in stream: + visible = scrubber.feed(delta) + if visible: + emit(visible) + trailing = scrubber.flush() # at end of stream + if trailing: + emit(trailing) + + The scrubber is re-entrant per agent instance. Callers building new + top-level responses (new turn) should create a fresh scrubber or call + ``reset()``. + """ + + _OPEN_TAG = "" + _CLOSE_TAG = "" + + def __init__(self) -> None: + self._in_span: bool = False + self._buf: str = "" + + def reset(self) -> None: + self._in_span = False + self._buf = "" + + def feed(self, text: str) -> str: + """Return the visible portion of ``text`` after scrubbing. + + Any trailing fragment that could be the start of an open/close tag + is held back in the internal buffer and surfaced on the next + ``feed()`` call or discarded/emitted by ``flush()``. + """ + if not text: + return "" + buf = self._buf + text + self._buf = "" + out: list[str] = [] + + while buf: + if self._in_span: + idx = buf.lower().find(self._CLOSE_TAG) + if idx == -1: + # Hold back a potential partial close tag; drop the rest + held = self._max_partial_suffix(buf, self._CLOSE_TAG) + self._buf = buf[-held:] if held else "" + return "".join(out) + # Found close — skip span content + tag, continue + buf = buf[idx + len(self._CLOSE_TAG):] + self._in_span = False + else: + idx = buf.lower().find(self._OPEN_TAG) + if idx == -1: + # No open tag — hold back a potential partial open tag + held = self._max_partial_suffix(buf, self._OPEN_TAG) + if held: + out.append(buf[:-held]) + self._buf = buf[-held:] + else: + out.append(buf) + return "".join(out) + # Emit text before the tag, enter span + if idx > 0: + out.append(buf[:idx]) + buf = buf[idx + len(self._OPEN_TAG):] + self._in_span = True + + return "".join(out) + + def flush(self) -> str: + """Emit any held-back buffer at end-of-stream. + + If we're still inside an unterminated span the remaining content is + discarded (safer: leaking partial memory context is worse than a + truncated answer). Otherwise the held-back partial-tag tail is + emitted verbatim (it turned out not to be a real tag). + """ + if self._in_span: + self._buf = "" + self._in_span = False + return "" + tail = self._buf + self._buf = "" + return tail + + @staticmethod + def _max_partial_suffix(buf: str, tag: str) -> int: + """Return the length of the longest buf-suffix that is a tag-prefix. + + Case-insensitive. Returns 0 if no suffix could start the tag. + """ + tag_lower = tag.lower() + buf_lower = buf.lower() + max_check = min(len(buf_lower), len(tag_lower) - 1) + for i in range(max_check, 0, -1): + if tag_lower.startswith(buf_lower[-i:]): + return i + return 0 + + def build_memory_context_block(raw_context: str) -> str: """Wrap prefetched memory in a fenced block with system note. diff --git a/run_agent.py b/run_agent.py index 00ab4d22f6..c479ccf0a4 100644 --- a/run_agent.py +++ b/run_agent.py @@ -79,7 +79,7 @@ from tools.browser_tool import cleanup_browser # Agent internals extracted to agent/ package for modularity -from agent.memory_manager import build_memory_context_block, sanitize_context +from agent.memory_manager import StreamingContextScrubber, build_memory_context_block, sanitize_context from agent.retry_utils import jittered_backoff from agent.error_classifier import classify_api_error, FailoverReason from agent.prompt_builder import ( @@ -1208,6 +1208,10 @@ class AIAgent: # Deferred paragraph break flag — set after tool iterations so a # single "\n\n" is prepended to the next real text delta. self._stream_needs_break = False + # Stateful scrubber for spans split across stream + # deltas (#5719). sanitize_context() alone can't survive chunk + # boundaries because the block regex needs both tags in one string. + self._stream_context_scrubber = StreamingContextScrubber() # Visible assistant text already delivered through live token callbacks # during the current model response. Used to avoid re-sending the same # commentary when the provider later returns it as a completed interim @@ -5784,6 +5788,20 @@ class AIAgent: def _reset_stream_delivery_tracking(self) -> None: """Reset tracking for text delivered during the current model response.""" + # Flush any benign partial-tag tail held by the context scrubber so it + # reaches the UI before we clear state for the next model call. If + # the scrubber is mid-span, flush() drops the orphaned content. + scrubber = getattr(self, "_stream_context_scrubber", None) + if scrubber is not None: + tail = scrubber.flush() + if tail: + callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None] + for cb in callbacks: + try: + cb(tail) + except Exception: + pass + self._record_streamed_assistant_text(tail) self._current_streamed_assistant_text = "" def _record_streamed_assistant_text(self, text: str) -> None: @@ -5838,7 +5856,17 @@ class AIAgent: else: prepended_break = False if isinstance(text, str): - text = sanitize_context(self._strip_think_blocks(text or "")) + # Strip blocks first (per-delta is safe for closed pairs; the + # unterminated-tag path is handled downstream by stream_consumer). + # Then feed through the stateful context scrubber so memory-context + # spans split across chunks cannot leak to the UI (#5719). + text = self._strip_think_blocks(text or "") + scrubber = getattr(self, "_stream_context_scrubber", None) + if scrubber is not None: + text = scrubber.feed(text) + else: + # Defensive: legacy callers without the scrubber attribute. + text = sanitize_context(text) if not prepended_break: text = text.lstrip("\n") if not text: @@ -9387,6 +9415,13 @@ class AIAgent: # Track user turns for memory flush and periodic nudge logic self._user_turn_count += 1 + # Reset the streaming context scrubber at the top of each turn so a + # hung span from a prior interrupted stream can't taint this turn's + # output. + scrubber = getattr(self, "_stream_context_scrubber", None) + if scrubber is not None: + scrubber.reset() + # Preserve the original user message (no nudge injection). original_user_message = persist_user_message if persist_user_message is not None else user_message