diff --git a/agent/think_scrubber.py b/agent/think_scrubber.py new file mode 100644 index 0000000000..44ddcacff7 --- /dev/null +++ b/agent/think_scrubber.py @@ -0,0 +1,386 @@ +"""Stateful scrubber for reasoning/thinking blocks in streamed assistant text. + +``run_agent._strip_think_blocks`` is regex-based and correct for a complete +string, but when it runs *per-delta* in ``_fire_stream_delta`` it destroys +the state that downstream consumers (CLI ``_stream_delta``, gateway +``GatewayStreamConsumer._filter_and_accumulate``) rely on. + +Concretely, when MiniMax-M2.7 streams + + delta1 = "" + delta2 = "Let me check their config" + delta3 = "" + +the per-delta regex erases delta1 entirely (case 2: unterminated-open at +boundary matches ``^...``), so the downstream state machine never +sees the open tag, treats delta2 as regular content, and leaks reasoning +to the user. Consumers that don't run their own state machine (ACP, +api_server, TTS) never had any defence at all — they just emitted +whatever survived the upstream regex. + +This module centralises the tag-suppression state machine at the +upstream layer so every stream_delta_callback sees text that has +already had reasoning blocks removed. Partial tags at delta +boundaries are held back until the next delta resolves them, and +end-of-stream flushing surfaces any held-back prose that turned out +not to be a real tag. + +Usage:: + + scrubber = StreamingThinkScrubber() + for delta in stream: + visible = scrubber.feed(delta) + if visible: + emit(visible) + tail = scrubber.flush() # at end of stream + if tail: + emit(tail) + +The scrubber is re-entrant per agent instance. Call ``reset()`` at +the top of each new turn so a hung block from an interrupted prior +stream cannot taint the next turn's output. + +Tag variants handled (case-insensitive): + ````, ````, ````, ````, + ````. + +Block-boundary rule for opens: an opening tag is only treated as a +reasoning-block opener when it appears at the start of the stream, +after a newline (optionally followed by whitespace), or when only +whitespace has been emitted on the current line. This prevents prose +that *mentions* the tag name (e.g. ``"use tags here"``) from +being incorrectly suppressed. Closed pairs (``X``) are +always suppressed regardless of boundary; a closed pair is an +intentional, bounded construct. +""" + +from __future__ import annotations + +from typing import Tuple + +__all__ = ["StreamingThinkScrubber"] + + +class StreamingThinkScrubber: + """Stateful scrubber for streaming reasoning/thinking blocks. + + State machine: + - ``_in_block``: True while inside an opened block, waiting for + a close tag. All text inside is discarded. + - ``_buf``: held-back partial-tag tail. Emitted / discarded on + the next ``feed()`` call or by ``flush()``. + - ``_last_emitted_ended_newline``: True iff the most recent + emission to the consumer ended with ``\\n``, or nothing has + been emitted yet (start-of-stream counts as a boundary). Used + to decide whether an open tag at buffer position 0 is at a + block boundary. + """ + + _OPEN_TAG_NAMES: Tuple[str, ...] = ( + "think", + "thinking", + "reasoning", + "thought", + "REASONING_SCRATCHPAD", + ) + + # Materialise literal tag strings so the hot path does string + # operations, not regex compilation per feed(). + _OPEN_TAGS: Tuple[str, ...] = tuple(f"<{name}>" for name in _OPEN_TAG_NAMES) + _CLOSE_TAGS: Tuple[str, ...] = tuple(f"" for name in _OPEN_TAG_NAMES) + + # Pre-compute the longest tag (for partial-tag hold-back bound). + _MAX_TAG_LEN: int = max(len(tag) for tag in _OPEN_TAGS + _CLOSE_TAGS) + + def __init__(self) -> None: + self._in_block: bool = False + self._buf: str = "" + self._last_emitted_ended_newline: bool = True + + def reset(self) -> None: + """Reset all state. Call at the top of every new turn.""" + self._in_block = False + self._buf = "" + self._last_emitted_ended_newline = True + + def feed(self, text: str) -> str: + """Feed one delta; return the scrubbed visible portion. + + May return an empty string when the entire delta is reasoning + content or is being held back pending resolution of a partial + tag at the boundary. + """ + if not text: + return "" + buf = self._buf + text + self._buf = "" + out: list[str] = [] + + while buf: + if self._in_block: + # Hunt for the earliest close tag. + close_idx, close_len = self._find_first_tag( + buf, self._CLOSE_TAGS, + ) + if close_idx == -1: + # No close yet — hold back a potential partial + # close-tag prefix; discard everything else. + held = self._max_partial_suffix(buf, self._CLOSE_TAGS) + self._buf = buf[-held:] if held else "" + return "".join(out) + # Found close: discard block content + tag, continue. + buf = buf[close_idx + close_len:] + self._in_block = False + else: + # Priority 1 — closed X pair anywhere in + # buf. Closed pairs are always an intentional, + # bounded construct (even mid-line prose containing + # an open/close pair is almost certainly a model + # leaking reasoning inline), so no boundary gating. + pair = self._find_earliest_closed_pair(buf) + # Priority 2 — unterminated open tag at a block + # boundary. Boundary-gated so prose that mentions + # '' isn't over-stripped. + open_idx, open_len = self._find_open_at_boundary( + buf, out, + ) + + # Pick whichever match comes earliest in the buffer. + if pair is not None and ( + open_idx == -1 or pair[0] <= open_idx + ): + start_idx, end_idx = pair + preceding = buf[:start_idx] + if preceding: + preceding = self._strip_orphan_close_tags(preceding) + if preceding: + out.append(preceding) + self._last_emitted_ended_newline = ( + preceding.endswith("\n") + ) + buf = buf[end_idx:] + continue + + if open_idx != -1: + # Unterminated open at boundary — emit preceding, + # enter block, continue loop with remainder. + preceding = buf[:open_idx] + if preceding: + preceding = self._strip_orphan_close_tags(preceding) + if preceding: + out.append(preceding) + self._last_emitted_ended_newline = ( + preceding.endswith("\n") + ) + self._in_block = True + buf = buf[open_idx + open_len:] + continue + + # No resolvable tag structure in buf. Hold back any + # partial-tag prefix at the tail so a split tag + # across deltas isn't missed, then emit the rest. + held = self._max_partial_suffix(buf, self._OPEN_TAGS) + held_close = self._max_partial_suffix( + buf, self._CLOSE_TAGS, + ) + held = max(held, held_close) + if held: + emit_text = buf[:-held] + self._buf = buf[-held:] + else: + emit_text = buf + self._buf = "" + if emit_text: + emit_text = self._strip_orphan_close_tags(emit_text) + if emit_text: + out.append(emit_text) + self._last_emitted_ended_newline = ( + emit_text.endswith("\n") + ) + return "".join(out) + + return "".join(out) + + def flush(self) -> str: + """End-of-stream flush. + + If still inside an unterminated block, held-back content is + discarded — leaking partial reasoning is worse than a + truncated answer. Otherwise the held-back partial-tag tail is + emitted verbatim (it turned out not to be a real tag prefix). + """ + if self._in_block: + self._buf = "" + self._in_block = False + return "" + tail = self._buf + self._buf = "" + if not tail: + return "" + tail = self._strip_orphan_close_tags(tail) + if tail: + self._last_emitted_ended_newline = tail.endswith("\n") + return tail + + # ── internal helpers ─────────────────────────────────────────────── + + @staticmethod + def _find_first_tag( + buf: str, tags: Tuple[str, ...], + ) -> Tuple[int, int]: + """Return (earliest_index, tag_length) over *tags*, or (-1, 0). + + Case-insensitive match. + """ + buf_lower = buf.lower() + best_idx = -1 + best_len = 0 + for tag in tags: + idx = buf_lower.find(tag.lower()) + if idx != -1 and (best_idx == -1 or idx < best_idx): + best_idx = idx + best_len = len(tag) + return best_idx, best_len + + def _find_earliest_closed_pair(self, buf: str): + """Return (start_idx, end_idx) of the earliest closed pair, else None. + + A closed pair is ``...`` of any variant. Matches are + case-insensitive and non-greedy (the closest close tag after + an open tag wins), matching the regex ``.*?`` + semantics of ``_strip_think_blocks`` case 1. When two tag + variants could both match, the one whose open tag appears + earlier wins. + """ + buf_lower = buf.lower() + best: "tuple[int, int] | None" = None + for open_tag, close_tag in zip(self._OPEN_TAGS, self._CLOSE_TAGS): + open_lower = open_tag.lower() + close_lower = close_tag.lower() + open_idx = buf_lower.find(open_lower) + if open_idx == -1: + continue + close_idx = buf_lower.find( + close_lower, open_idx + len(open_lower), + ) + if close_idx == -1: + continue + end_idx = close_idx + len(close_lower) + if best is None or open_idx < best[0]: + best = (open_idx, end_idx) + return best + + def _find_open_at_boundary( + self, buf: str, already_emitted: list[str], + ) -> Tuple[int, int]: + """Return the earliest block-boundary open-tag (idx, len). + + Returns (-1, 0) if no boundary-legal opener is present. + """ + buf_lower = buf.lower() + best_idx = -1 + best_len = 0 + for tag in self._OPEN_TAGS: + tag_lower = tag.lower() + search_start = 0 + while True: + idx = buf_lower.find(tag_lower, search_start) + if idx == -1: + break + if self._is_block_boundary(buf, idx, already_emitted): + if best_idx == -1 or idx < best_idx: + best_idx = idx + best_len = len(tag) + break # first boundary hit for this tag is enough + search_start = idx + 1 + return best_idx, best_len + + def _is_block_boundary( + self, buf: str, idx: int, already_emitted: list[str], + ) -> bool: + """True iff position *idx* in *buf* is a block boundary. + + A block boundary is: + - buf position 0 AND the most recent emission ended with + a newline (or nothing has been emitted yet) + - any position whose preceding text on the current line + (since the last newline in buf) is whitespace-only, AND + if there is no newline in the preceding buf portion, the + most recent prior emission ended with a newline + """ + if idx == 0: + # Check whether the last already-emitted chunk in THIS + # feed() call ended with a newline, otherwise fall back + # to the cross-feed flag. + if already_emitted: + return already_emitted[-1].endswith("\n") + return self._last_emitted_ended_newline + preceding = buf[:idx] + last_nl = preceding.rfind("\n") + if last_nl == -1: + # No newline in buf before the tag — boundary only if the + # prior emission ended with a newline AND everything since + # is whitespace. + if already_emitted: + prior_newline = already_emitted[-1].endswith("\n") + else: + prior_newline = self._last_emitted_ended_newline + return prior_newline and preceding.strip() == "" + # Newline present — text between it and the tag must be + # whitespace-only. + return preceding[last_nl + 1:].strip() == "" + + @classmethod + def _max_partial_suffix( + cls, buf: str, tags: Tuple[str, ...], + ) -> int: + """Return the longest buf-suffix that is a prefix of any tag. + + Only prefixes strictly shorter than the tag itself count + (full-length suffixes are the tag and are handled as matches, + not held-back partials). Case-insensitive. + """ + if not buf: + return 0 + buf_lower = buf.lower() + max_check = min(len(buf_lower), cls._MAX_TAG_LEN - 1) + for i in range(max_check, 0, -1): + suffix = buf_lower[-i:] + for tag in tags: + tag_lower = tag.lower() + if len(tag_lower) > i and tag_lower.startswith(suffix): + return i + return 0 + + @classmethod + def _strip_orphan_close_tags(cls, text: str) -> str: + """Remove any close tags from *text* (orphan-close handling). + + An orphan close tag has no matching open in the current + scrubber state; it's always noise, stripped with any trailing + whitespace so the surrounding prose flows naturally. + """ + if " str: _AUTO_CONTINUE_FRESHNESS_SECS_DEFAULT = 60 * 60 -# --- Stale-code self-check ------------------------------------------------ -# Long-running gateway processes that survive an ``hermes update`` keep the -# old ``hermes_cli.config`` (and friends) cached in ``sys.modules``. When -# the updated tool files on disk then try to ``from hermes_cli.config -# import cfg_get`` (added in PR #17304), the import resolves against the -# already-loaded stale module object and raises ``ImportError`` — see -# Issue #17648. Rather than papering over the import failure site-by-site -# in every tool file, detect the stale state centrally and auto-restart -# so the gateway reloads with fresh code. -# -# The signal we use is ``git rev-parse HEAD`` — the only thing ``hermes -# update`` moves that is NOT moved by agent-driven file edits. Earlier -# revisions of this check compared file mtimes across a sentinel set -# (run_agent.py, gateway/run.py, ...), but that produced false positives -# whenever the agent edited its own source files during a session: -# mtime jumps, stale-check fires, gateway restarts, user must retype. -# See the conversation at PR # for the motivating incident. -# -# The legacy mtime sentinels are kept ONLY as a last-resort fallback for -# non-git installs (pip install from wheel, sparse clones with no .git -# dir). In those environments ``hermes update`` is not a supported path, -# so the check effectively no-ops — which is the safe behavior: better -# to ship one broken import than to restart on every agent-edit. -_STALE_CODE_SENTINELS: tuple[str, ...] = ( - "hermes_cli/config.py", - "hermes_cli/__init__.py", - "run_agent.py", - "gateway/run.py", - "pyproject.toml", -) - -# Cache git HEAD reads across consecutive messages so a chat burst doesn't -# spawn one subprocess per message. 5s is long enough to collapse a burst -# and short enough that the real post-update detection still fires within -# the user's perceived "next message" window. -_GIT_SHA_CACHE_TTL_SECS = 5.0 - - -def _read_git_head_sha(repo_root: Path) -> Optional[str]: - """Return the git HEAD SHA for ``repo_root``, or None if unavailable. - - Reads ``.git/HEAD`` directly (and follows one level of ref) instead - of shelling out to ``git`` — cheaper, no subprocess tax, works on - gateway hosts that don't have a ``git`` binary on PATH. Returns - None for non-git installs (no ``.git`` dir) or any I/O error; callers - treat None as "can't tell" and skip the check. - - Supports the three layouts we care about: - 1. Main checkout: ``/.git/`` is a directory. - 2. Git worktree: ``/.git`` is a file ``gitdir: `` that - points at ``
/.git/worktrees//``. The worktree's - gitdir has HEAD + index but NOT refs/heads/ — those live in - the main checkout, and ``/commondir`` points - at the main ``.git``. We search both locations for refs. - 3. Packed refs: ``refs/heads/`` is absent on disk but - listed in ``/packed-refs``. - """ - try: - git_dir = repo_root / ".git" - # Worktrees store ``.git`` as a file pointing at gitdir: - if git_dir.is_file(): - try: - content = git_dir.read_text().strip() - if content.startswith("gitdir:"): - git_dir = Path(content.split(":", 1)[1].strip()) - if not git_dir.is_absolute(): - git_dir = (repo_root / git_dir).resolve() - except OSError: - return None - if not git_dir.is_dir(): - return None - - # Figure out the "common" git dir — the one that owns shared refs. - # For a worktree, commondir points at it (relative path, resolve - # against git_dir). For a main checkout, common_dir == git_dir. - common_dir = git_dir - commondir_file = git_dir / "commondir" - if commondir_file.is_file(): - try: - rel = commondir_file.read_text().strip() - candidate = (git_dir / rel).resolve() if rel else git_dir - if candidate.is_dir(): - common_dir = candidate - except OSError: - pass - - head_path = git_dir / "HEAD" - if not head_path.is_file(): - return None - head_content = head_path.read_text().strip() - - if head_content.startswith("ref:"): - # Symbolic ref — follow one level (e.g. ref: refs/heads/main). - # Worktree-local refs (bisect, rebase-merge state) live under - # git_dir; shared refs (refs/heads/*, refs/tags/*) live under - # common_dir. Try git_dir first, then common_dir. - ref_rel = head_content.split(":", 1)[1].strip() - for base in (git_dir, common_dir) if git_dir != common_dir else (git_dir,): - ref_path = base / ref_rel - if ref_path.is_file(): - try: - sha = ref_path.read_text().strip() - except OSError: - continue - if sha: - return sha - # Packed refs fallback — always stored in the common dir. - packed = common_dir / "packed-refs" - if packed.is_file(): - try: - for line in packed.read_text().splitlines(): - line = line.strip() - if not line or line.startswith("#") or line.startswith("^"): - continue - parts = line.split(None, 1) - if len(parts) == 2 and parts[1] == ref_rel: - return parts[0] or None - except OSError: - return None - return None - - # Detached HEAD — content is the SHA directly. - return head_content or None - except Exception: - return None - - -def _compute_repo_mtime(repo_root: Path) -> float: - """Return the newest mtime across the stale-code sentinel files. - - Legacy fallback used only for non-git installs (``.git`` missing). - Missing files are ignored (they may not exist on older checkouts). - Returns 0.0 if no sentinel file is readable — treat that as "can't - tell", which downstream callers interpret as "not stale" to avoid - false-positive restart loops. - """ - newest = 0.0 - for rel in _STALE_CODE_SENTINELS: - try: - st = (repo_root / rel).stat() - except (OSError, FileNotFoundError): - continue - if st.st_mtime > newest: - newest = st.st_mtime - return newest - - def _coerce_gateway_timestamp(value: Any) -> Optional[float]: """Best-effort conversion of stored gateway timestamps to epoch seconds. @@ -1107,13 +960,6 @@ class GatewayRunner: _stop_task: Optional[asyncio.Task] = None _session_model_overrides: Dict[str, Dict[str, str]] = {} _session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} - # Stale-code self-check defaults (see _detect_stale_code()). Class-level - # so tests that construct GatewayRunner via ``object.__new__`` without - # running __init__ don't crash when _handle_message reads these. - _boot_wall_time: float = 0.0 - _boot_repo_mtime: float = 0.0 - _boot_git_sha: Optional[str] = None - _stale_code_restart_triggered: bool = False def __init__(self, config: Optional[GatewayConfig] = None): global _gateway_runner_ref @@ -1122,30 +968,6 @@ class GatewayRunner: self._warn_if_docker_media_delivery_is_risky() _gateway_runner_ref = _weakref.ref(self) - # Boot-time snapshot used by the stale-code self-check. Captured - # before any work happens so post-update file writes are guaranteed - # to have newer mtimes. See _detect_stale_code() / Issue #17648. - try: - self._boot_wall_time: float = time.time() - self._repo_root_for_staleness: Path = Path(__file__).resolve().parent.parent - self._boot_git_sha: Optional[str] = _read_git_head_sha( - self._repo_root_for_staleness, - ) - self._boot_repo_mtime: float = _compute_repo_mtime( - self._repo_root_for_staleness, - ) - except Exception: - self._boot_wall_time = 0.0 - self._repo_root_for_staleness = Path(".") - self._boot_git_sha = None - self._boot_repo_mtime = 0.0 - self._stale_code_notified: set[str] = set() - self._stale_code_restart_triggered: bool = False - # Cached current-SHA read, refreshed at most every - # _GIT_SHA_CACHE_TTL_SECS so bursty chats don't hammer the filesystem. - self._cached_current_sha: Optional[str] = self._boot_git_sha - self._cached_current_sha_at: float = self._boot_wall_time - # Load ephemeral config from config.yaml / env vars. # Both are injected at API-call time only and never persisted. self._prefill_messages = self._load_prefill_messages() @@ -2853,101 +2675,6 @@ class GatewayRunner: task.add_done_callback(self._background_tasks.discard) return True - def _current_git_sha_cached(self) -> Optional[str]: - """Return the current HEAD SHA, cached for _GIT_SHA_CACHE_TTL_SECS. - - A bursty chat (user mashes "hello?" three times) would otherwise - re-read ``.git/HEAD`` on every message. Caching collapses that - into a single read and still re-checks within the user's - perceived "next message" window. - """ - now = time.time() - if ( - self._cached_current_sha is not None - and (now - self._cached_current_sha_at) < _GIT_SHA_CACHE_TTL_SECS - ): - return self._cached_current_sha - try: - sha = _read_git_head_sha(self._repo_root_for_staleness) - except Exception: - sha = None - self._cached_current_sha = sha - self._cached_current_sha_at = now - return sha - - def _detect_stale_code(self) -> bool: - """Return True if the git HEAD moved since this process booted. - - A gateway that survives ``hermes update`` (manual SIGTERM never - escalated, systemd restart race, detached-process respawn failed, - etc.) keeps pre-update modules cached in ``sys.modules``. Later - imports of names added post-update — e.g. ``cfg_get`` from PR - #17304 — raise ImportError against the stale module object (see - Issue #17648). - - We compare the git HEAD SHA at boot to the current SHA on disk. - ``hermes update`` always moves HEAD forward via ``git pull``; - agent file edits (the agent patching ``run_agent.py`` or - ``gateway/run.py`` during a self-dev session) never move HEAD. - That makes SHA comparison free of the false-positive class that - the old mtime check suffered from — the agent can edit any file - without triggering a phantom restart. - - Returns False when: - - the boot SHA is unavailable (non-git install, first call - during partial init, etc.); we can't tell and refuse to loop - - the current SHA matches the boot SHA - - reading the current SHA fails for any reason - """ - if not self._boot_wall_time: - return False - if not self._boot_git_sha: - # Non-git install. ``hermes update`` is git-based, so a - # non-git install can't experience the stale-modules class - # this check exists to catch. Return False — no check, no - # false positives. (If we ever ship a pip-install update - # path, we'd add a persistent update marker here and compare - # its timestamp to self._boot_wall_time.) - return False - try: - current = self._current_git_sha_cached() - except Exception: - return False - if not current: - return False - return current != self._boot_git_sha - - def _trigger_stale_code_restart(self) -> None: - """Idempotently kick off a graceful restart after stale-code detection. - - Runs at most once per process. The restart request goes through - the normal drain path so in-flight agent turns finish before the - process exits; the service manager (systemd / launchd / detached - profile watcher) then respawns with fresh code. On manual - ``hermes gateway run`` installs without a supervisor, the - process exits and the user must restart by hand — but they get a - user-visible message telling them so. - """ - if self._stale_code_restart_triggered: - return - self._stale_code_restart_triggered = True - current_sha = None - try: - current_sha = self._current_git_sha_cached() - except Exception: - pass - logger.warning( - "Stale-code self-check: git HEAD moved since gateway boot " - "(boot=%s, current=%s) — requesting graceful restart. " - "See Issue #17648.", - (self._boot_git_sha or "?")[:12], - (current_sha or "?")[:12], - ) - try: - self.request_restart(detached=False, via_service=True) - except Exception as exc: - logger.error("Stale-code restart request failed: %s", exc) - async def start(self) -> bool: """ Start the gateway and all configured platform adapters. @@ -4878,27 +4605,6 @@ class GatewayRunner: """ source = event.source - # Stale-code self-check (Issue #17648). A gateway that survives - # ``hermes update`` keeps old modules cached in sys.modules; the - # first inbound message is our earliest safe chance to detect - # this and restart gracefully before we dispatch to the agent - # and hit ImportError on freshly-added names (e.g. cfg_get). - # Idempotent — runs the real check at most once per message, and - # request_restart() no-ops after the first call. - try: - if self._detect_stale_code(): - self._trigger_stale_code_restart() - # Acknowledge to the user so they don't see a silent - # drop; the gateway will be back up in a moment via the - # service manager / profile-watcher respawn. - return ( - "⟳ Gateway code was updated in the background — " - "restarting this gateway so your next message runs " - "on the new code. Please retry in a moment." - ) - except Exception as _stale_exc: - logger.debug("Stale-code self-check failed: %s", _stale_exc) - # Internal events (e.g. background-process completion notifications) # are system-generated and must skip user authorization. is_internal = bool(getattr(event, "internal", False)) diff --git a/run_agent.py b/run_agent.py index 4d8ffa1908..3554ff665d 100644 --- a/run_agent.py +++ b/run_agent.py @@ -128,6 +128,7 @@ from tools.browser_tool import cleanup_browser # Agent internals extracted to agent/ package for modularity from agent.memory_manager import StreamingContextScrubber, build_memory_context_block, sanitize_context +from agent.think_scrubber import StreamingThinkScrubber from agent.retry_utils import jittered_backoff from agent.error_classifier import classify_api_error, FailoverReason from agent.prompt_builder import ( @@ -1297,6 +1298,13 @@ class AIAgent: # deltas (#5719). sanitize_context() alone can't survive chunk # boundaries because the block regex needs both tags in one string. self._stream_context_scrubber = StreamingContextScrubber() + # Stateful scrubber for reasoning/thinking tags in streamed deltas + # (#17924). Replaces the per-delta _strip_think_blocks regex that + # destroyed downstream state (e.g. MiniMax-M2.7 streaming + # '' as delta1 and 'Let me check' as delta2 — the regex + # erased delta1, so downstream state machines never learned a + # block was open and leaked delta2 as content). + self._stream_think_scrubber = StreamingThinkScrubber() # Visible assistant text already delivered through live token callbacks # during the current model response. Used to avoid re-sending the same # commentary when the provider later returns it as a completed interim @@ -6543,6 +6551,29 @@ class AIAgent: def _reset_stream_delivery_tracking(self) -> None: """Reset tracking for text delivered during the current model response.""" + # Flush any benign partial-tag tail held by the think scrubber + # first (#17924): an innocent '<' at the end of the stream that + # turned out not to be a tag prefix should reach the UI. Then + # flush the context scrubber. Order matters — the think + # scrubber's output feeds into the context scrubber's state. + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + think_tail = think_scrubber.flush() + if think_tail: + # Route the tail through the context scrubber too so a + # memory-context span straddling the final boundary is + # still caught. + ctx_scrubber = getattr(self, "_stream_context_scrubber", None) + if ctx_scrubber is not None: + think_tail = ctx_scrubber.feed(think_tail) + if think_tail: + callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None] + for cb in callbacks: + try: + cb(think_tail) + except Exception: + pass + self._record_streamed_assistant_text(think_tail) # Flush any benign partial-tag tail held by the context scrubber so it # reaches the UI before we clear state for the next model call. If # the scrubber is mid-span, flush() drops the orphaned content. @@ -6611,11 +6642,22 @@ class AIAgent: else: prepended_break = False if isinstance(text, str): - # Strip blocks first (per-delta is safe for closed pairs; the - # unterminated-tag path is handled downstream by stream_consumer). + # Suppress reasoning/thinking blocks via the stateful + # scrubber (#17924). Earlier versions ran _strip_think_blocks + # per-delta here, which destroyed downstream state machines + # when a tag was split across deltas (e.g. MiniMax-M2.7 + # sends '' and its content as separate deltas — + # regex case 2 erased the first delta, so the CLI/gateway + # state machine never saw the open tag and leaked the + # reasoning content as regular response text). + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + text = think_scrubber.feed(text or "") + else: + # Defensive: legacy callers without the scrubber attribute. + text = self._strip_think_blocks(text or "") # Then feed through the stateful context scrubber so memory-context # spans split across chunks cannot leak to the UI (#5719). - text = self._strip_think_blocks(text or "") scrubber = getattr(self, "_stream_context_scrubber", None) if scrubber is not None: text = scrubber.feed(text) @@ -10576,6 +10618,11 @@ class AIAgent: scrubber = getattr(self, "_stream_context_scrubber", None) if scrubber is not None: scrubber.reset() + # Reset the think scrubber for the same reason — an interrupted + # prior stream may have left us inside an unterminated block. + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + think_scrubber.reset() # Preserve the original user message (no nudge injection). original_user_message = persist_user_message if persist_user_message is not None else user_message diff --git a/tests/agent/test_think_scrubber.py b/tests/agent/test_think_scrubber.py new file mode 100644 index 0000000000..0f9937d11d --- /dev/null +++ b/tests/agent/test_think_scrubber.py @@ -0,0 +1,229 @@ +"""Tests for StreamingThinkScrubber. + +These tests lock in the contract the scrubber must satisfy so downstream +consumers (ACP, api_server, TTS, CLI, gateway) never see reasoning +blocks leaking through the stream_delta_callback. The scenarios map +directly to the MiniMax-M2.7 / DeepSeek / Qwen3 streaming patterns that +break the older per-delta regex strip. +""" + +from __future__ import annotations + +import pytest + +from agent.think_scrubber import StreamingThinkScrubber + + +def _drive(scrubber: StreamingThinkScrubber, deltas: list[str]) -> str: + """Feed a sequence of deltas and return the concatenated visible output.""" + out = [scrubber.feed(d) for d in deltas] + out.append(scrubber.flush()) + return "".join(out) + + +class TestClosedPairs: + """Closed ... pairs are always stripped, regardless of boundary.""" + + def test_closed_pair_single_delta(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["reasoningHello world"]) == "Hello world" + + def test_closed_pair_surrounded_by_content(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Hello note world"]) == "Hello world" + + @pytest.mark.parametrize( + "tag", + ["think", "thinking", "reasoning", "thought", "REASONING_SCRATCHPAD"], + ) + def test_all_tag_variants(self, tag: str) -> None: + s = StreamingThinkScrubber() + delta = f"<{tag}>xHello" + assert _drive(s, [delta]) == "Hello" + + def test_case_insensitive_pair(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["xHello"]) == "Hello" + + +class TestUnterminatedOpen: + """Unterminated open tag discards all subsequent content to end of stream.""" + + def test_open_at_stream_start(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["reasoning text with no close"]) == "" + + def test_open_after_newline(self) -> None: + s = StreamingThinkScrubber() + # 'Hello\n' is a block boundary for the that follows + assert _drive(s, ["Hello\nreasoning"]) == "Hello\n" + + def test_open_after_newline_then_whitespace(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Hello\n reasoning"]) == "Hello\n " + + def test_prose_mentioning_tag_not_stripped(self) -> None: + """Mid-line '' in prose is preserved (no boundary).""" + s = StreamingThinkScrubber() + text = "Use the element for reasoning" + assert _drive(s, [text]) == text + + +class TestOrphanClose: + """Orphan close tags (no prior open) are stripped without boundary check.""" + + def test_orphan_close_alone(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Helloworld"]) == "Helloworld" + + def test_orphan_close_with_trailing_space_consumed(self) -> None: + """Matches _strip_think_blocks case 3 \\s* behaviour.""" + s = StreamingThinkScrubber() + assert _drive(s, ["Hello world"]) == "Helloworld" + + def test_multiple_orphan_closes(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["ABC"]) == "ABC" + + +class TestPartialTagsAcrossDeltas: + """Partial tags at delta boundaries must be held back, not emitted raw.""" + + def test_split_open_tag_held_back(self) -> None: + """'<' arrives alone, 'think>' completes it on next delta.""" + s = StreamingThinkScrubber() + # At stream start, last_emitted_ended_newline=True, so at 0 is boundary + assert ( + _drive(s, ["<", "think>reasoningdone"]) + == "done" + ) + + def test_split_open_tag_not_at_boundary(self) -> None: + """Mid-line split '<' + 'think>X' is a closed pair. + + Closed pairs are always stripped (matching + ``_strip_think_blocks`` case 1), even without a block + boundary — a closed pair is an intentional bounded construct. + """ + s = StreamingThinkScrubber() + out = _drive(s, ["word<", "think>prosemore"]) + assert out == "wordmore" + + def test_split_close_tag_held_back(self) -> None: + """Close tag split across deltas still closes the block.""" + s = StreamingThinkScrubber() + assert ( + _drive(s, ["reasoning<", "/think>after"]) + == "after" + ) + + def test_split_close_tag_deep(self) -> None: + """Close tag can be split anywhere.""" + s = StreamingThinkScrubber() + assert ( + _drive(s, ["reasoningafter"]) + == "after" + ) + + +class TestTheMiniMaxScenario: + """The exact pattern run_agent per-delta regex strip breaks.""" + + def test_minimax_split_open(self) -> None: + """delta1='', delta2='Let me check', delta3='done'.""" + s = StreamingThinkScrubber() + out = _drive(s, ["", "Let me check their config", "", "done"]) + assert out == "done" + + def test_minimax_split_open_with_trailing_content(self) -> None: + """Reasoning then closes and hands off to final content.""" + s = StreamingThinkScrubber() + out = _drive( + s, + [ + "", + "The user wants to know if thinking is on", + "", + "\n\nshow_reasoning: false — thinking is OFF.", + ], + ) + assert out == "\n\nshow_reasoning: false — thinking is OFF." + + def test_minimax_unterminated_reasoning_at_end(self) -> None: + """Unclosed reasoning at stream end is dropped entirely.""" + s = StreamingThinkScrubber() + out = _drive(s, ["", "The user wants", " to know something"]) + assert out == "" + + +class TestResetAndReentry: + def test_reset_clears_in_block_state(self) -> None: + s = StreamingThinkScrubber() + s.feed("hanging") + assert s._in_block is True + s.reset() + assert s._in_block is False + # After reset, a new turn works cleanly + assert _drive(s, ["Hello world"]) == "Hello world" + + def test_reset_clears_buffered_partial_tag(self) -> None: + s = StreamingThinkScrubber() + s.feed("word<") + assert s._buf == "<" + s.reset() + assert s._buf == "" + assert _drive(s, ["fresh content"]) == "fresh content" + + +class TestFlushBehaviour: + def test_flush_drops_unterminated_block(self) -> None: + s = StreamingThinkScrubber() + assert s.feed("reasoning with no close") == "" + assert s.flush() == "" + + def test_flush_emits_innocent_partial_tag_tail(self) -> None: + """If held-back tail turned out not to be a real tag, emit it.""" + s = StreamingThinkScrubber() + s.feed("word<") # '<' could be a tag prefix + # Stream ends with only '<' held back — emit it as prose. + assert s.flush() == "<" + + def test_flush_on_empty_scrubber(self) -> None: + s = StreamingThinkScrubber() + assert s.flush() == "" + + +class TestRealisticStreaming: + """Character-by-character streaming must work as well as larger chunks.""" + + def test_char_by_char_closed_pair(self) -> None: + s = StreamingThinkScrubber() + deltas = list("xHello world") + assert _drive(s, deltas) == "Hello world" + + def test_char_by_char_orphan_close(self) -> None: + s = StreamingThinkScrubber() + deltas = list("Helloworld") + assert _drive(s, deltas) == "Helloworld" + + def test_reasoning_then_real_response_first_word_preserved(self) -> None: + """Regression: the first word of the final response must NOT be eaten. + + Stefan's screenshot bug — 'Let me check' was being rendered as + ' me check'. The scrubber must not consume any character of + post-close content. + """ + s = StreamingThinkScrubber() + deltas = [ + "", + "User wants to know things", + "", + "Let me check their config.", + ] + assert _drive(s, deltas) == "Let me check their config." + + def test_no_tag_passthrough_is_identical(self) -> None: + """Streams without any reasoning tags pass through byte-for-byte.""" + s = StreamingThinkScrubber() + deltas = ["Hello ", "world ", "how ", "are ", "you?"] + assert _drive(s, deltas) == "Hello world how are you?" diff --git a/tests/gateway/test_stale_code_self_check.py b/tests/gateway/test_stale_code_self_check.py deleted file mode 100644 index 64ad347145..0000000000 --- a/tests/gateway/test_stale_code_self_check.py +++ /dev/null @@ -1,412 +0,0 @@ -"""Tests for the gateway stale-code self-check (Issue #17648). - -A gateway that survives ``hermes update`` keeps pre-update modules cached -in ``sys.modules``. Later imports of names added post-update (e.g. -``cfg_get`` from PR #17304) raise ImportError against the stale module -object. - -The self-check compares the git HEAD SHA at boot to the current SHA on -disk. ``hermes update`` always moves HEAD forward via ``git pull``; -agent-driven file edits (Hermes editing ``run_agent.py`` / ``gateway/run.py`` -during a self-dev session) never move HEAD — so the SHA signal is free of -the false-positive class that the earlier mtime-based check suffered from. -""" - -import os -import time -from pathlib import Path - -import pytest - -from gateway.run import ( - GatewayRunner, - _compute_repo_mtime, - _read_git_head_sha, - _STALE_CODE_SENTINELS, - _GIT_SHA_CACHE_TTL_SECS, -) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _make_tmp_repo(tmp_path: Path) -> Path: - """Create a fake repo with all stale-code sentinel files.""" - for rel in _STALE_CODE_SENTINELS: - p = tmp_path / rel - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text("# test sentinel\n") - return tmp_path - - -def _make_git_repo(tmp_path: Path, sha: str = "a" * 40, branch: str = "main") -> Path: - """Stamp a minimal .git directory so _read_git_head_sha can resolve a SHA. - - We don't run real git — just lay down the files the reader walks - (.git/HEAD pointing at refs/heads/, refs/heads/ - containing the SHA). - """ - git_dir = tmp_path / ".git" - git_dir.mkdir(parents=True, exist_ok=True) - (git_dir / "HEAD").write_text(f"ref: refs/heads/{branch}\n") - refs_dir = git_dir / "refs" / "heads" - refs_dir.mkdir(parents=True, exist_ok=True) - (refs_dir / branch).write_text(f"{sha}\n") - return tmp_path - - -def _set_head_sha(repo_root: Path, sha: str, branch: str = "main") -> None: - """Rewrite the current branch ref to a new SHA (simulates git pull).""" - (repo_root / ".git" / "refs" / "heads" / branch).write_text(f"{sha}\n") - - -def _make_runner( - repo_root: Path, - *, - boot_sha: str | None, - boot_wall: float = None, - boot_mtime: float = 0.0, -): - """Bare GatewayRunner with just the stale-check attributes set.""" - if boot_wall is None: - boot_wall = time.time() - runner = object.__new__(GatewayRunner) - runner._repo_root_for_staleness = repo_root - runner._boot_wall_time = boot_wall - runner._boot_git_sha = boot_sha - runner._boot_repo_mtime = boot_mtime - runner._stale_code_notified = set() - runner._stale_code_restart_triggered = False - runner._cached_current_sha = boot_sha - runner._cached_current_sha_at = boot_wall - return runner - - -# --------------------------------------------------------------------------- -# _read_git_head_sha — raw SHA reader -# --------------------------------------------------------------------------- - -def test_read_git_head_sha_branch_ref(tmp_path): - """Resolves ref: refs/heads/ → SHA from refs/heads/.""" - sha = "b" * 40 - _make_git_repo(tmp_path, sha=sha, branch="main") - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_detached_head(tmp_path): - """Detached HEAD: .git/HEAD contains the SHA directly.""" - sha = "c" * 40 - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text(f"{sha}\n") - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_packed_refs(tmp_path): - """Falls back to packed-refs when refs/heads/ is missing.""" - sha = "d" * 40 - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text("ref: refs/heads/main\n") - # No refs/heads/main file — only packed-refs - (git_dir / "packed-refs").write_text( - f"# pack-refs with: peeled fully-peeled sorted\n" - f"{sha} refs/heads/main\n" - ) - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_worktree_gitdir_file(tmp_path): - """Worktree: .git is a file with `gitdir: ` pointing to the real git dir. - - Real git worktrees store shared refs (refs/heads/*) in the main - checkout's .git/ and write a ``commondir`` pointer into the - worktree-gitdir. The reader must follow commondir to resolve the - branch ref — this is the layout Hermes dev sessions actually use. - """ - sha = "e" * 40 - # Main repo layout - main_repo = tmp_path / "main-repo" - main_git = main_repo / ".git" - (main_git / "refs" / "heads").mkdir(parents=True) - (main_git / "HEAD").write_text("ref: refs/heads/main\n") - (main_git / "refs" / "heads" / "main").write_text("0" * 40 + "\n") - - # Worktree lives in main-repo/.git/worktrees// - worktree_git_dir = main_git / "worktrees" / "feature" - worktree_git_dir.mkdir(parents=True) - (worktree_git_dir / "HEAD").write_text("ref: refs/heads/feature\n") - # commondir points back at the main .git (relative path, "../..") - (worktree_git_dir / "commondir").write_text("../..\n") - # Feature branch ref lives in the shared refs/heads - (main_git / "refs" / "heads" / "feature").write_text(f"{sha}\n") - - # Worktree checkout with .git file pointing at worktree_git_dir - worktree = tmp_path / "wt" - worktree.mkdir() - (worktree / ".git").write_text(f"gitdir: {worktree_git_dir}\n") - - assert _read_git_head_sha(worktree) == sha - - -def test_read_git_head_sha_worktree_packed_refs_in_common(tmp_path): - """Worktree + packed-refs in common dir: fallback still resolves.""" - sha = "f" * 40 - main_repo = tmp_path / "main-repo" - main_git = main_repo / ".git" - main_git.mkdir(parents=True) - (main_git / "HEAD").write_text("ref: refs/heads/main\n") - # packed-refs in the common (main) .git - (main_git / "packed-refs").write_text( - f"# pack-refs with: peeled fully-peeled sorted\n" - f"{sha} refs/heads/feature\n" - ) - - worktree_git_dir = main_git / "worktrees" / "feature" - worktree_git_dir.mkdir(parents=True) - (worktree_git_dir / "HEAD").write_text("ref: refs/heads/feature\n") - (worktree_git_dir / "commondir").write_text("../..\n") - - worktree = tmp_path / "wt" - worktree.mkdir() - (worktree / ".git").write_text(f"gitdir: {worktree_git_dir}\n") - - assert _read_git_head_sha(worktree) == sha - - -def test_read_git_head_sha_no_git_returns_none(tmp_path): - """No .git dir → None (non-git install, safely disables the check).""" - assert _read_git_head_sha(tmp_path) is None - - -def test_read_git_head_sha_malformed_head_returns_none(tmp_path): - """Empty HEAD file → None (don't loop on corrupt repos).""" - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text("") - assert _read_git_head_sha(tmp_path) is None - - -# --------------------------------------------------------------------------- -# _detect_stale_code — the main regression guard -# --------------------------------------------------------------------------- - -def test_detect_stale_code_false_when_sha_unchanged(tmp_path): - """Boot SHA == current SHA → not stale (no restart).""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - # Force fresh read by expiring the cache - runner._cached_current_sha_at = 0.0 - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_true_after_git_pull(tmp_path): - """Boot SHA != current SHA → stale (hermes update happened).""" - boot_sha = "a" * 40 - _make_git_repo(tmp_path, sha=boot_sha) - runner = _make_runner(tmp_path, boot_sha=boot_sha) - # Simulate git pull moving HEAD forward - _set_head_sha(tmp_path, "b" * 40) - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is True - - -def test_detect_stale_code_ignores_agent_file_edits(tmp_path): - """THE CORE REGRESSION: agent edits to source files do NOT trigger restart. - - This is the motivating incident for the SHA-based check. Under the - previous mtime-based scheme, any ``patch`` / ``write_file`` call - against run_agent.py / gateway/run.py / hermes_cli/config.py would - flip the stale-check to True and force a gateway restart on the - next message — even though no update actually happened. SHA - comparison decouples the two: git HEAD only moves on ``git pull``, - never on file writes. - """ - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - _make_tmp_repo(tmp_path) # lay down sentinel files too - runner = _make_runner(tmp_path, boot_sha=sha) - - # Simulate the agent editing run_agent.py and gateway/run.py with - # mtimes far into the future — exactly the scenario that used to - # false-positive the old mtime check. - future = time.time() + 10_000 - for rel in _STALE_CODE_SENTINELS: - p = tmp_path / rel - if p.is_file(): - p.write_text("# agent just edited this\n") - os.utime(p, (future, future)) - - # HEAD SHA has NOT moved — check must stay False. - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_false_for_non_git_install(tmp_path): - """Non-git install (no .git dir) → check disabled, never fires.""" - # No .git dir at all; runner's boot_sha is None - runner = _make_runner(tmp_path, boot_sha=None) - # Even if we pretended the current SHA differed, the check should - # short-circuit on boot_sha=None and return False. - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_false_when_no_boot_wall_time(tmp_path): - """No boot snapshot at all → can't tell → not stale (no restart loop).""" - runner = _make_runner(tmp_path, boot_sha="a" * 40, boot_wall=0.0) - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_handles_disappearing_git_dir(tmp_path): - """.git vanishes mid-run → current_sha = None → not stale (don't loop).""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - # Nuke the git dir after boot - import shutil - shutil.rmtree(tmp_path / ".git") - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is False - - -# --------------------------------------------------------------------------- -# SHA cache -# --------------------------------------------------------------------------- - -def test_current_sha_cache_collapses_bursts(tmp_path, monkeypatch): - """Consecutive calls inside the TTL window reuse the cached SHA.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - read_calls = {"n": 0} - real_reader = _read_git_head_sha - - def counting_reader(repo_root): - read_calls["n"] += 1 - return real_reader(repo_root) - - from gateway import run as run_mod - monkeypatch.setattr(run_mod, "_read_git_head_sha", counting_reader) - - # Force cache expiry so the first call definitely reads - runner._cached_current_sha_at = 0.0 - runner._current_git_sha_cached() - first_count = read_calls["n"] - - # Immediate second/third calls should hit cache (no new read) - runner._current_git_sha_cached() - runner._current_git_sha_cached() - assert read_calls["n"] == first_count - - -def test_current_sha_cache_expires_after_ttl(tmp_path, monkeypatch): - """After _GIT_SHA_CACHE_TTL_SECS elapses, a fresh read happens.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - read_calls = {"n": 0} - real_reader = _read_git_head_sha - - def counting_reader(repo_root): - read_calls["n"] += 1 - return real_reader(repo_root) - - from gateway import run as run_mod - monkeypatch.setattr(run_mod, "_read_git_head_sha", counting_reader) - - runner._cached_current_sha_at = 0.0 - runner._current_git_sha_cached() - first = read_calls["n"] - - # Age the cache past the TTL - runner._cached_current_sha_at = time.time() - (_GIT_SHA_CACHE_TTL_SECS + 1.0) - runner._current_git_sha_cached() - assert read_calls["n"] == first + 1 - - -# --------------------------------------------------------------------------- -# _trigger_stale_code_restart — idempotency preserved -# --------------------------------------------------------------------------- - -def test_trigger_stale_code_restart_is_idempotent(tmp_path): - """Calling _trigger_stale_code_restart twice only requests restart once.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - calls = [] - - def fake_request_restart(*, detached=False, via_service=False): - calls.append((detached, via_service)) - return True - - runner.request_restart = fake_request_restart - - runner._trigger_stale_code_restart() - runner._trigger_stale_code_restart() - runner._trigger_stale_code_restart() - - assert len(calls) == 1 - assert runner._stale_code_restart_triggered is True - - -def test_trigger_stale_code_restart_survives_request_failure(tmp_path): - """If request_restart raises, we swallow and mark as triggered anyway.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - def boom(*, detached=False, via_service=False): - raise RuntimeError("no event loop") - - runner.request_restart = boom - - # Should not raise - runner._trigger_stale_code_restart() - - # Marked triggered so we don't retry on every subsequent message - assert runner._stale_code_restart_triggered is True - - -# --------------------------------------------------------------------------- -# Class-level defaults — tests that build bare runners via object.__new__ -# --------------------------------------------------------------------------- - -def test_class_level_defaults_prevent_uninitialized_access(): - """Partial construction via object.__new__ must not crash _detect_stale_code.""" - runner = object.__new__(GatewayRunner) - # Don't set any instance attrs — class-level defaults should kick in - runner._repo_root_for_staleness = Path(".") - # _boot_wall_time / _boot_git_sha fall through to class defaults - # (0.0 and None respectively) - assert runner._detect_stale_code() is False - # _stale_code_restart_triggered falls through to class default (False) - assert runner._stale_code_restart_triggered is False - - -# --------------------------------------------------------------------------- -# Legacy mtime reader kept for compatibility — light sanity check only -# --------------------------------------------------------------------------- - -def test_compute_repo_mtime_still_returns_newest(tmp_path): - """_compute_repo_mtime remains available for any legacy callers.""" - repo = _make_tmp_repo(tmp_path) - - baseline = time.time() - 100 - for rel in _STALE_CODE_SENTINELS: - os.utime(repo / rel, (baseline, baseline)) - - newer = time.time() - os.utime(repo / "hermes_cli/config.py", (newer, newer)) - - result = _compute_repo_mtime(repo) - assert abs(result - newer) < 1.0 - - -def test_compute_repo_mtime_missing_files_returns_zero(tmp_path): - """Legacy sanity: missing sentinels → 0.0.""" - assert _compute_repo_mtime(tmp_path) == 0.0