From 2a285d5ec228e5df782957ed1b29d1df44e2a3ae Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 5 May 2026 04:33:38 -0700 Subject: [PATCH] fix(agent): stateful streaming scrubber for reasoning-block leaks (#17924) (#20184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * revert(gateway): remove stale-code self-check and auto-restart Removes the _detect_stale_code / _trigger_stale_code_restart mechanism introduced in #17648 and iterated in #19740. On every incoming message the gateway compared the boot-time git HEAD SHA to the current SHA on disk, and if they differed it would reply with Gateway code was updated in the background -- restarting this gateway so your next message runs on the new code. Please retry in a moment. and then kick off a graceful restart. This is unwanted behaviour: users who run a long-lived gateway and do their own ad-hoc git operations on the checkout end up with their chat interrupted and the current message dropped every time HEAD moves, with no way to opt out. If an operator really needs the old protection against stale sys.modules after "hermes update", the SIGKILL-survivor sweep in hermes update (hermes_cli/main.py, also tagged #17648) already handles the supervisor-respawn case on its own. Removed: gateway/run.py: - _STALE_CODE_SENTINELS, _GIT_SHA_CACHE_TTL_SECS - _read_git_head_sha(), _compute_repo_mtime() module helpers - class-level _boot_wall_time / _boot_repo_mtime / _boot_git_sha / _stale_code_restart_triggered defaults - __init__ boot-snapshot block (_boot_*, _cached_current_sha*, _repo_root_for_staleness, _stale_code_notified) - _current_git_sha_cached(), _detect_stale_code(), _trigger_stale_code_restart() methods - stale-code check + user-facing restart notice at the top of _handle_message() tests/gateway/test_stale_code_self_check.py (deleted, 412 lines) No new logic added. Zero remaining references to any removed symbol. Gateway test suite passes the same 4589 tests it passed before; the 3 pre-existing unrelated failures (discord free-channel, feishu bot admission, teams typing) are unchanged by this commit. * fix(agent): stateful streaming scrubber for reasoning-block leaks (#17924) Per-delta _strip_think_blocks ran at _fire_stream_delta and destroyed downstream state. When MiniMax-M2.7 / DeepSeek / Qwen3 streamed a tag split across deltas (delta1='', delta2='Let me check'), the regex case-2 match erased delta1 entirely, so CLI/gateway state machines never learned a block was open and leaked delta2 as content. Raw consumers (ACP, api_server, TTS) had no downstream defense at all. Replace the per-delta regex with a stateful StreamingThinkScrubber that survives delta boundaries: - Closed X pairs always stripped (matches _strip_think_blocks case 1). - Unterminated open at block boundary enters a block; content discarded until close tag arrives. At end-of-stream, held content is dropped. - Orphan close tags stripped without boundary gating. - Partial tags at delta boundaries held back until resolved. - Block-boundary rule (start-of-stream, after \n, or whitespace-only since last \n) preserves prose that mentions tag names. Reset at turn start alongside the existing context scrubber; flush at turn end so a benign '<' held back at end-of-stream reaches the UI. E2E-verified on live OpenRouter->MiniMax-m2 streams: closed pairs strip cleanly, first word of post-block content is preserved, pure content passes through unchanged. Stefan's screenshot case (#17924) — 'Let me check' getting chopped to ' me check' — no longer happens. Final _strip_think_blocks calls on completed strings (final_response, replay, compression) are preserved; only the streaming per-delta call site switched to the scrubber. --- agent/think_scrubber.py | 386 ++++++++++++++++++ gateway/run.py | 294 -------------- run_agent.py | 53 ++- tests/agent/test_think_scrubber.py | 229 +++++++++++ tests/gateway/test_stale_code_self_check.py | 412 -------------------- 5 files changed, 665 insertions(+), 709 deletions(-) create mode 100644 agent/think_scrubber.py create mode 100644 tests/agent/test_think_scrubber.py delete mode 100644 tests/gateway/test_stale_code_self_check.py diff --git a/agent/think_scrubber.py b/agent/think_scrubber.py new file mode 100644 index 0000000000..44ddcacff7 --- /dev/null +++ b/agent/think_scrubber.py @@ -0,0 +1,386 @@ +"""Stateful scrubber for reasoning/thinking blocks in streamed assistant text. + +``run_agent._strip_think_blocks`` is regex-based and correct for a complete +string, but when it runs *per-delta* in ``_fire_stream_delta`` it destroys +the state that downstream consumers (CLI ``_stream_delta``, gateway +``GatewayStreamConsumer._filter_and_accumulate``) rely on. + +Concretely, when MiniMax-M2.7 streams + + delta1 = "" + delta2 = "Let me check their config" + delta3 = "" + +the per-delta regex erases delta1 entirely (case 2: unterminated-open at +boundary matches ``^...``), so the downstream state machine never +sees the open tag, treats delta2 as regular content, and leaks reasoning +to the user. Consumers that don't run their own state machine (ACP, +api_server, TTS) never had any defence at all — they just emitted +whatever survived the upstream regex. + +This module centralises the tag-suppression state machine at the +upstream layer so every stream_delta_callback sees text that has +already had reasoning blocks removed. Partial tags at delta +boundaries are held back until the next delta resolves them, and +end-of-stream flushing surfaces any held-back prose that turned out +not to be a real tag. + +Usage:: + + scrubber = StreamingThinkScrubber() + for delta in stream: + visible = scrubber.feed(delta) + if visible: + emit(visible) + tail = scrubber.flush() # at end of stream + if tail: + emit(tail) + +The scrubber is re-entrant per agent instance. Call ``reset()`` at +the top of each new turn so a hung block from an interrupted prior +stream cannot taint the next turn's output. + +Tag variants handled (case-insensitive): + ````, ````, ````, ````, + ````. + +Block-boundary rule for opens: an opening tag is only treated as a +reasoning-block opener when it appears at the start of the stream, +after a newline (optionally followed by whitespace), or when only +whitespace has been emitted on the current line. This prevents prose +that *mentions* the tag name (e.g. ``"use tags here"``) from +being incorrectly suppressed. Closed pairs (``X``) are +always suppressed regardless of boundary; a closed pair is an +intentional, bounded construct. +""" + +from __future__ import annotations + +from typing import Tuple + +__all__ = ["StreamingThinkScrubber"] + + +class StreamingThinkScrubber: + """Stateful scrubber for streaming reasoning/thinking blocks. + + State machine: + - ``_in_block``: True while inside an opened block, waiting for + a close tag. All text inside is discarded. + - ``_buf``: held-back partial-tag tail. Emitted / discarded on + the next ``feed()`` call or by ``flush()``. + - ``_last_emitted_ended_newline``: True iff the most recent + emission to the consumer ended with ``\\n``, or nothing has + been emitted yet (start-of-stream counts as a boundary). Used + to decide whether an open tag at buffer position 0 is at a + block boundary. + """ + + _OPEN_TAG_NAMES: Tuple[str, ...] = ( + "think", + "thinking", + "reasoning", + "thought", + "REASONING_SCRATCHPAD", + ) + + # Materialise literal tag strings so the hot path does string + # operations, not regex compilation per feed(). + _OPEN_TAGS: Tuple[str, ...] = tuple(f"<{name}>" for name in _OPEN_TAG_NAMES) + _CLOSE_TAGS: Tuple[str, ...] = tuple(f"" for name in _OPEN_TAG_NAMES) + + # Pre-compute the longest tag (for partial-tag hold-back bound). + _MAX_TAG_LEN: int = max(len(tag) for tag in _OPEN_TAGS + _CLOSE_TAGS) + + def __init__(self) -> None: + self._in_block: bool = False + self._buf: str = "" + self._last_emitted_ended_newline: bool = True + + def reset(self) -> None: + """Reset all state. Call at the top of every new turn.""" + self._in_block = False + self._buf = "" + self._last_emitted_ended_newline = True + + def feed(self, text: str) -> str: + """Feed one delta; return the scrubbed visible portion. + + May return an empty string when the entire delta is reasoning + content or is being held back pending resolution of a partial + tag at the boundary. + """ + if not text: + return "" + buf = self._buf + text + self._buf = "" + out: list[str] = [] + + while buf: + if self._in_block: + # Hunt for the earliest close tag. + close_idx, close_len = self._find_first_tag( + buf, self._CLOSE_TAGS, + ) + if close_idx == -1: + # No close yet — hold back a potential partial + # close-tag prefix; discard everything else. + held = self._max_partial_suffix(buf, self._CLOSE_TAGS) + self._buf = buf[-held:] if held else "" + return "".join(out) + # Found close: discard block content + tag, continue. + buf = buf[close_idx + close_len:] + self._in_block = False + else: + # Priority 1 — closed X pair anywhere in + # buf. Closed pairs are always an intentional, + # bounded construct (even mid-line prose containing + # an open/close pair is almost certainly a model + # leaking reasoning inline), so no boundary gating. + pair = self._find_earliest_closed_pair(buf) + # Priority 2 — unterminated open tag at a block + # boundary. Boundary-gated so prose that mentions + # '' isn't over-stripped. + open_idx, open_len = self._find_open_at_boundary( + buf, out, + ) + + # Pick whichever match comes earliest in the buffer. + if pair is not None and ( + open_idx == -1 or pair[0] <= open_idx + ): + start_idx, end_idx = pair + preceding = buf[:start_idx] + if preceding: + preceding = self._strip_orphan_close_tags(preceding) + if preceding: + out.append(preceding) + self._last_emitted_ended_newline = ( + preceding.endswith("\n") + ) + buf = buf[end_idx:] + continue + + if open_idx != -1: + # Unterminated open at boundary — emit preceding, + # enter block, continue loop with remainder. + preceding = buf[:open_idx] + if preceding: + preceding = self._strip_orphan_close_tags(preceding) + if preceding: + out.append(preceding) + self._last_emitted_ended_newline = ( + preceding.endswith("\n") + ) + self._in_block = True + buf = buf[open_idx + open_len:] + continue + + # No resolvable tag structure in buf. Hold back any + # partial-tag prefix at the tail so a split tag + # across deltas isn't missed, then emit the rest. + held = self._max_partial_suffix(buf, self._OPEN_TAGS) + held_close = self._max_partial_suffix( + buf, self._CLOSE_TAGS, + ) + held = max(held, held_close) + if held: + emit_text = buf[:-held] + self._buf = buf[-held:] + else: + emit_text = buf + self._buf = "" + if emit_text: + emit_text = self._strip_orphan_close_tags(emit_text) + if emit_text: + out.append(emit_text) + self._last_emitted_ended_newline = ( + emit_text.endswith("\n") + ) + return "".join(out) + + return "".join(out) + + def flush(self) -> str: + """End-of-stream flush. + + If still inside an unterminated block, held-back content is + discarded — leaking partial reasoning is worse than a + truncated answer. Otherwise the held-back partial-tag tail is + emitted verbatim (it turned out not to be a real tag prefix). + """ + if self._in_block: + self._buf = "" + self._in_block = False + return "" + tail = self._buf + self._buf = "" + if not tail: + return "" + tail = self._strip_orphan_close_tags(tail) + if tail: + self._last_emitted_ended_newline = tail.endswith("\n") + return tail + + # ── internal helpers ─────────────────────────────────────────────── + + @staticmethod + def _find_first_tag( + buf: str, tags: Tuple[str, ...], + ) -> Tuple[int, int]: + """Return (earliest_index, tag_length) over *tags*, or (-1, 0). + + Case-insensitive match. + """ + buf_lower = buf.lower() + best_idx = -1 + best_len = 0 + for tag in tags: + idx = buf_lower.find(tag.lower()) + if idx != -1 and (best_idx == -1 or idx < best_idx): + best_idx = idx + best_len = len(tag) + return best_idx, best_len + + def _find_earliest_closed_pair(self, buf: str): + """Return (start_idx, end_idx) of the earliest closed pair, else None. + + A closed pair is ``...`` of any variant. Matches are + case-insensitive and non-greedy (the closest close tag after + an open tag wins), matching the regex ``.*?`` + semantics of ``_strip_think_blocks`` case 1. When two tag + variants could both match, the one whose open tag appears + earlier wins. + """ + buf_lower = buf.lower() + best: "tuple[int, int] | None" = None + for open_tag, close_tag in zip(self._OPEN_TAGS, self._CLOSE_TAGS): + open_lower = open_tag.lower() + close_lower = close_tag.lower() + open_idx = buf_lower.find(open_lower) + if open_idx == -1: + continue + close_idx = buf_lower.find( + close_lower, open_idx + len(open_lower), + ) + if close_idx == -1: + continue + end_idx = close_idx + len(close_lower) + if best is None or open_idx < best[0]: + best = (open_idx, end_idx) + return best + + def _find_open_at_boundary( + self, buf: str, already_emitted: list[str], + ) -> Tuple[int, int]: + """Return the earliest block-boundary open-tag (idx, len). + + Returns (-1, 0) if no boundary-legal opener is present. + """ + buf_lower = buf.lower() + best_idx = -1 + best_len = 0 + for tag in self._OPEN_TAGS: + tag_lower = tag.lower() + search_start = 0 + while True: + idx = buf_lower.find(tag_lower, search_start) + if idx == -1: + break + if self._is_block_boundary(buf, idx, already_emitted): + if best_idx == -1 or idx < best_idx: + best_idx = idx + best_len = len(tag) + break # first boundary hit for this tag is enough + search_start = idx + 1 + return best_idx, best_len + + def _is_block_boundary( + self, buf: str, idx: int, already_emitted: list[str], + ) -> bool: + """True iff position *idx* in *buf* is a block boundary. + + A block boundary is: + - buf position 0 AND the most recent emission ended with + a newline (or nothing has been emitted yet) + - any position whose preceding text on the current line + (since the last newline in buf) is whitespace-only, AND + if there is no newline in the preceding buf portion, the + most recent prior emission ended with a newline + """ + if idx == 0: + # Check whether the last already-emitted chunk in THIS + # feed() call ended with a newline, otherwise fall back + # to the cross-feed flag. + if already_emitted: + return already_emitted[-1].endswith("\n") + return self._last_emitted_ended_newline + preceding = buf[:idx] + last_nl = preceding.rfind("\n") + if last_nl == -1: + # No newline in buf before the tag — boundary only if the + # prior emission ended with a newline AND everything since + # is whitespace. + if already_emitted: + prior_newline = already_emitted[-1].endswith("\n") + else: + prior_newline = self._last_emitted_ended_newline + return prior_newline and preceding.strip() == "" + # Newline present — text between it and the tag must be + # whitespace-only. + return preceding[last_nl + 1:].strip() == "" + + @classmethod + def _max_partial_suffix( + cls, buf: str, tags: Tuple[str, ...], + ) -> int: + """Return the longest buf-suffix that is a prefix of any tag. + + Only prefixes strictly shorter than the tag itself count + (full-length suffixes are the tag and are handled as matches, + not held-back partials). Case-insensitive. + """ + if not buf: + return 0 + buf_lower = buf.lower() + max_check = min(len(buf_lower), cls._MAX_TAG_LEN - 1) + for i in range(max_check, 0, -1): + suffix = buf_lower[-i:] + for tag in tags: + tag_lower = tag.lower() + if len(tag_lower) > i and tag_lower.startswith(suffix): + return i + return 0 + + @classmethod + def _strip_orphan_close_tags(cls, text: str) -> str: + """Remove any close tags from *text* (orphan-close handling). + + An orphan close tag has no matching open in the current + scrubber state; it's always noise, stripped with any trailing + whitespace so the surrounding prose flows naturally. + """ + if " str: _AUTO_CONTINUE_FRESHNESS_SECS_DEFAULT = 60 * 60 -# --- Stale-code self-check ------------------------------------------------ -# Long-running gateway processes that survive an ``hermes update`` keep the -# old ``hermes_cli.config`` (and friends) cached in ``sys.modules``. When -# the updated tool files on disk then try to ``from hermes_cli.config -# import cfg_get`` (added in PR #17304), the import resolves against the -# already-loaded stale module object and raises ``ImportError`` — see -# Issue #17648. Rather than papering over the import failure site-by-site -# in every tool file, detect the stale state centrally and auto-restart -# so the gateway reloads with fresh code. -# -# The signal we use is ``git rev-parse HEAD`` — the only thing ``hermes -# update`` moves that is NOT moved by agent-driven file edits. Earlier -# revisions of this check compared file mtimes across a sentinel set -# (run_agent.py, gateway/run.py, ...), but that produced false positives -# whenever the agent edited its own source files during a session: -# mtime jumps, stale-check fires, gateway restarts, user must retype. -# See the conversation at PR # for the motivating incident. -# -# The legacy mtime sentinels are kept ONLY as a last-resort fallback for -# non-git installs (pip install from wheel, sparse clones with no .git -# dir). In those environments ``hermes update`` is not a supported path, -# so the check effectively no-ops — which is the safe behavior: better -# to ship one broken import than to restart on every agent-edit. -_STALE_CODE_SENTINELS: tuple[str, ...] = ( - "hermes_cli/config.py", - "hermes_cli/__init__.py", - "run_agent.py", - "gateway/run.py", - "pyproject.toml", -) - -# Cache git HEAD reads across consecutive messages so a chat burst doesn't -# spawn one subprocess per message. 5s is long enough to collapse a burst -# and short enough that the real post-update detection still fires within -# the user's perceived "next message" window. -_GIT_SHA_CACHE_TTL_SECS = 5.0 - - -def _read_git_head_sha(repo_root: Path) -> Optional[str]: - """Return the git HEAD SHA for ``repo_root``, or None if unavailable. - - Reads ``.git/HEAD`` directly (and follows one level of ref) instead - of shelling out to ``git`` — cheaper, no subprocess tax, works on - gateway hosts that don't have a ``git`` binary on PATH. Returns - None for non-git installs (no ``.git`` dir) or any I/O error; callers - treat None as "can't tell" and skip the check. - - Supports the three layouts we care about: - 1. Main checkout: ``/.git/`` is a directory. - 2. Git worktree: ``/.git`` is a file ``gitdir: `` that - points at ``
/.git/worktrees//``. The worktree's - gitdir has HEAD + index but NOT refs/heads/ — those live in - the main checkout, and ``/commondir`` points - at the main ``.git``. We search both locations for refs. - 3. Packed refs: ``refs/heads/`` is absent on disk but - listed in ``/packed-refs``. - """ - try: - git_dir = repo_root / ".git" - # Worktrees store ``.git`` as a file pointing at gitdir: - if git_dir.is_file(): - try: - content = git_dir.read_text().strip() - if content.startswith("gitdir:"): - git_dir = Path(content.split(":", 1)[1].strip()) - if not git_dir.is_absolute(): - git_dir = (repo_root / git_dir).resolve() - except OSError: - return None - if not git_dir.is_dir(): - return None - - # Figure out the "common" git dir — the one that owns shared refs. - # For a worktree, commondir points at it (relative path, resolve - # against git_dir). For a main checkout, common_dir == git_dir. - common_dir = git_dir - commondir_file = git_dir / "commondir" - if commondir_file.is_file(): - try: - rel = commondir_file.read_text().strip() - candidate = (git_dir / rel).resolve() if rel else git_dir - if candidate.is_dir(): - common_dir = candidate - except OSError: - pass - - head_path = git_dir / "HEAD" - if not head_path.is_file(): - return None - head_content = head_path.read_text().strip() - - if head_content.startswith("ref:"): - # Symbolic ref — follow one level (e.g. ref: refs/heads/main). - # Worktree-local refs (bisect, rebase-merge state) live under - # git_dir; shared refs (refs/heads/*, refs/tags/*) live under - # common_dir. Try git_dir first, then common_dir. - ref_rel = head_content.split(":", 1)[1].strip() - for base in (git_dir, common_dir) if git_dir != common_dir else (git_dir,): - ref_path = base / ref_rel - if ref_path.is_file(): - try: - sha = ref_path.read_text().strip() - except OSError: - continue - if sha: - return sha - # Packed refs fallback — always stored in the common dir. - packed = common_dir / "packed-refs" - if packed.is_file(): - try: - for line in packed.read_text().splitlines(): - line = line.strip() - if not line or line.startswith("#") or line.startswith("^"): - continue - parts = line.split(None, 1) - if len(parts) == 2 and parts[1] == ref_rel: - return parts[0] or None - except OSError: - return None - return None - - # Detached HEAD — content is the SHA directly. - return head_content or None - except Exception: - return None - - -def _compute_repo_mtime(repo_root: Path) -> float: - """Return the newest mtime across the stale-code sentinel files. - - Legacy fallback used only for non-git installs (``.git`` missing). - Missing files are ignored (they may not exist on older checkouts). - Returns 0.0 if no sentinel file is readable — treat that as "can't - tell", which downstream callers interpret as "not stale" to avoid - false-positive restart loops. - """ - newest = 0.0 - for rel in _STALE_CODE_SENTINELS: - try: - st = (repo_root / rel).stat() - except (OSError, FileNotFoundError): - continue - if st.st_mtime > newest: - newest = st.st_mtime - return newest - - def _coerce_gateway_timestamp(value: Any) -> Optional[float]: """Best-effort conversion of stored gateway timestamps to epoch seconds. @@ -1107,13 +960,6 @@ class GatewayRunner: _stop_task: Optional[asyncio.Task] = None _session_model_overrides: Dict[str, Dict[str, str]] = {} _session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} - # Stale-code self-check defaults (see _detect_stale_code()). Class-level - # so tests that construct GatewayRunner via ``object.__new__`` without - # running __init__ don't crash when _handle_message reads these. - _boot_wall_time: float = 0.0 - _boot_repo_mtime: float = 0.0 - _boot_git_sha: Optional[str] = None - _stale_code_restart_triggered: bool = False def __init__(self, config: Optional[GatewayConfig] = None): global _gateway_runner_ref @@ -1122,30 +968,6 @@ class GatewayRunner: self._warn_if_docker_media_delivery_is_risky() _gateway_runner_ref = _weakref.ref(self) - # Boot-time snapshot used by the stale-code self-check. Captured - # before any work happens so post-update file writes are guaranteed - # to have newer mtimes. See _detect_stale_code() / Issue #17648. - try: - self._boot_wall_time: float = time.time() - self._repo_root_for_staleness: Path = Path(__file__).resolve().parent.parent - self._boot_git_sha: Optional[str] = _read_git_head_sha( - self._repo_root_for_staleness, - ) - self._boot_repo_mtime: float = _compute_repo_mtime( - self._repo_root_for_staleness, - ) - except Exception: - self._boot_wall_time = 0.0 - self._repo_root_for_staleness = Path(".") - self._boot_git_sha = None - self._boot_repo_mtime = 0.0 - self._stale_code_notified: set[str] = set() - self._stale_code_restart_triggered: bool = False - # Cached current-SHA read, refreshed at most every - # _GIT_SHA_CACHE_TTL_SECS so bursty chats don't hammer the filesystem. - self._cached_current_sha: Optional[str] = self._boot_git_sha - self._cached_current_sha_at: float = self._boot_wall_time - # Load ephemeral config from config.yaml / env vars. # Both are injected at API-call time only and never persisted. self._prefill_messages = self._load_prefill_messages() @@ -2853,101 +2675,6 @@ class GatewayRunner: task.add_done_callback(self._background_tasks.discard) return True - def _current_git_sha_cached(self) -> Optional[str]: - """Return the current HEAD SHA, cached for _GIT_SHA_CACHE_TTL_SECS. - - A bursty chat (user mashes "hello?" three times) would otherwise - re-read ``.git/HEAD`` on every message. Caching collapses that - into a single read and still re-checks within the user's - perceived "next message" window. - """ - now = time.time() - if ( - self._cached_current_sha is not None - and (now - self._cached_current_sha_at) < _GIT_SHA_CACHE_TTL_SECS - ): - return self._cached_current_sha - try: - sha = _read_git_head_sha(self._repo_root_for_staleness) - except Exception: - sha = None - self._cached_current_sha = sha - self._cached_current_sha_at = now - return sha - - def _detect_stale_code(self) -> bool: - """Return True if the git HEAD moved since this process booted. - - A gateway that survives ``hermes update`` (manual SIGTERM never - escalated, systemd restart race, detached-process respawn failed, - etc.) keeps pre-update modules cached in ``sys.modules``. Later - imports of names added post-update — e.g. ``cfg_get`` from PR - #17304 — raise ImportError against the stale module object (see - Issue #17648). - - We compare the git HEAD SHA at boot to the current SHA on disk. - ``hermes update`` always moves HEAD forward via ``git pull``; - agent file edits (the agent patching ``run_agent.py`` or - ``gateway/run.py`` during a self-dev session) never move HEAD. - That makes SHA comparison free of the false-positive class that - the old mtime check suffered from — the agent can edit any file - without triggering a phantom restart. - - Returns False when: - - the boot SHA is unavailable (non-git install, first call - during partial init, etc.); we can't tell and refuse to loop - - the current SHA matches the boot SHA - - reading the current SHA fails for any reason - """ - if not self._boot_wall_time: - return False - if not self._boot_git_sha: - # Non-git install. ``hermes update`` is git-based, so a - # non-git install can't experience the stale-modules class - # this check exists to catch. Return False — no check, no - # false positives. (If we ever ship a pip-install update - # path, we'd add a persistent update marker here and compare - # its timestamp to self._boot_wall_time.) - return False - try: - current = self._current_git_sha_cached() - except Exception: - return False - if not current: - return False - return current != self._boot_git_sha - - def _trigger_stale_code_restart(self) -> None: - """Idempotently kick off a graceful restart after stale-code detection. - - Runs at most once per process. The restart request goes through - the normal drain path so in-flight agent turns finish before the - process exits; the service manager (systemd / launchd / detached - profile watcher) then respawns with fresh code. On manual - ``hermes gateway run`` installs without a supervisor, the - process exits and the user must restart by hand — but they get a - user-visible message telling them so. - """ - if self._stale_code_restart_triggered: - return - self._stale_code_restart_triggered = True - current_sha = None - try: - current_sha = self._current_git_sha_cached() - except Exception: - pass - logger.warning( - "Stale-code self-check: git HEAD moved since gateway boot " - "(boot=%s, current=%s) — requesting graceful restart. " - "See Issue #17648.", - (self._boot_git_sha or "?")[:12], - (current_sha or "?")[:12], - ) - try: - self.request_restart(detached=False, via_service=True) - except Exception as exc: - logger.error("Stale-code restart request failed: %s", exc) - async def start(self) -> bool: """ Start the gateway and all configured platform adapters. @@ -4878,27 +4605,6 @@ class GatewayRunner: """ source = event.source - # Stale-code self-check (Issue #17648). A gateway that survives - # ``hermes update`` keeps old modules cached in sys.modules; the - # first inbound message is our earliest safe chance to detect - # this and restart gracefully before we dispatch to the agent - # and hit ImportError on freshly-added names (e.g. cfg_get). - # Idempotent — runs the real check at most once per message, and - # request_restart() no-ops after the first call. - try: - if self._detect_stale_code(): - self._trigger_stale_code_restart() - # Acknowledge to the user so they don't see a silent - # drop; the gateway will be back up in a moment via the - # service manager / profile-watcher respawn. - return ( - "⟳ Gateway code was updated in the background — " - "restarting this gateway so your next message runs " - "on the new code. Please retry in a moment." - ) - except Exception as _stale_exc: - logger.debug("Stale-code self-check failed: %s", _stale_exc) - # Internal events (e.g. background-process completion notifications) # are system-generated and must skip user authorization. is_internal = bool(getattr(event, "internal", False)) diff --git a/run_agent.py b/run_agent.py index 4d8ffa1908..3554ff665d 100644 --- a/run_agent.py +++ b/run_agent.py @@ -128,6 +128,7 @@ from tools.browser_tool import cleanup_browser # Agent internals extracted to agent/ package for modularity from agent.memory_manager import StreamingContextScrubber, build_memory_context_block, sanitize_context +from agent.think_scrubber import StreamingThinkScrubber from agent.retry_utils import jittered_backoff from agent.error_classifier import classify_api_error, FailoverReason from agent.prompt_builder import ( @@ -1297,6 +1298,13 @@ class AIAgent: # deltas (#5719). sanitize_context() alone can't survive chunk # boundaries because the block regex needs both tags in one string. self._stream_context_scrubber = StreamingContextScrubber() + # Stateful scrubber for reasoning/thinking tags in streamed deltas + # (#17924). Replaces the per-delta _strip_think_blocks regex that + # destroyed downstream state (e.g. MiniMax-M2.7 streaming + # '' as delta1 and 'Let me check' as delta2 — the regex + # erased delta1, so downstream state machines never learned a + # block was open and leaked delta2 as content). + self._stream_think_scrubber = StreamingThinkScrubber() # Visible assistant text already delivered through live token callbacks # during the current model response. Used to avoid re-sending the same # commentary when the provider later returns it as a completed interim @@ -6543,6 +6551,29 @@ class AIAgent: def _reset_stream_delivery_tracking(self) -> None: """Reset tracking for text delivered during the current model response.""" + # Flush any benign partial-tag tail held by the think scrubber + # first (#17924): an innocent '<' at the end of the stream that + # turned out not to be a tag prefix should reach the UI. Then + # flush the context scrubber. Order matters — the think + # scrubber's output feeds into the context scrubber's state. + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + think_tail = think_scrubber.flush() + if think_tail: + # Route the tail through the context scrubber too so a + # memory-context span straddling the final boundary is + # still caught. + ctx_scrubber = getattr(self, "_stream_context_scrubber", None) + if ctx_scrubber is not None: + think_tail = ctx_scrubber.feed(think_tail) + if think_tail: + callbacks = [cb for cb in (self.stream_delta_callback, self._stream_callback) if cb is not None] + for cb in callbacks: + try: + cb(think_tail) + except Exception: + pass + self._record_streamed_assistant_text(think_tail) # Flush any benign partial-tag tail held by the context scrubber so it # reaches the UI before we clear state for the next model call. If # the scrubber is mid-span, flush() drops the orphaned content. @@ -6611,11 +6642,22 @@ class AIAgent: else: prepended_break = False if isinstance(text, str): - # Strip blocks first (per-delta is safe for closed pairs; the - # unterminated-tag path is handled downstream by stream_consumer). + # Suppress reasoning/thinking blocks via the stateful + # scrubber (#17924). Earlier versions ran _strip_think_blocks + # per-delta here, which destroyed downstream state machines + # when a tag was split across deltas (e.g. MiniMax-M2.7 + # sends '' and its content as separate deltas — + # regex case 2 erased the first delta, so the CLI/gateway + # state machine never saw the open tag and leaked the + # reasoning content as regular response text). + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + text = think_scrubber.feed(text or "") + else: + # Defensive: legacy callers without the scrubber attribute. + text = self._strip_think_blocks(text or "") # Then feed through the stateful context scrubber so memory-context # spans split across chunks cannot leak to the UI (#5719). - text = self._strip_think_blocks(text or "") scrubber = getattr(self, "_stream_context_scrubber", None) if scrubber is not None: text = scrubber.feed(text) @@ -10576,6 +10618,11 @@ class AIAgent: scrubber = getattr(self, "_stream_context_scrubber", None) if scrubber is not None: scrubber.reset() + # Reset the think scrubber for the same reason — an interrupted + # prior stream may have left us inside an unterminated block. + think_scrubber = getattr(self, "_stream_think_scrubber", None) + if think_scrubber is not None: + think_scrubber.reset() # Preserve the original user message (no nudge injection). original_user_message = persist_user_message if persist_user_message is not None else user_message diff --git a/tests/agent/test_think_scrubber.py b/tests/agent/test_think_scrubber.py new file mode 100644 index 0000000000..0f9937d11d --- /dev/null +++ b/tests/agent/test_think_scrubber.py @@ -0,0 +1,229 @@ +"""Tests for StreamingThinkScrubber. + +These tests lock in the contract the scrubber must satisfy so downstream +consumers (ACP, api_server, TTS, CLI, gateway) never see reasoning +blocks leaking through the stream_delta_callback. The scenarios map +directly to the MiniMax-M2.7 / DeepSeek / Qwen3 streaming patterns that +break the older per-delta regex strip. +""" + +from __future__ import annotations + +import pytest + +from agent.think_scrubber import StreamingThinkScrubber + + +def _drive(scrubber: StreamingThinkScrubber, deltas: list[str]) -> str: + """Feed a sequence of deltas and return the concatenated visible output.""" + out = [scrubber.feed(d) for d in deltas] + out.append(scrubber.flush()) + return "".join(out) + + +class TestClosedPairs: + """Closed ... pairs are always stripped, regardless of boundary.""" + + def test_closed_pair_single_delta(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["reasoningHello world"]) == "Hello world" + + def test_closed_pair_surrounded_by_content(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Hello note world"]) == "Hello world" + + @pytest.mark.parametrize( + "tag", + ["think", "thinking", "reasoning", "thought", "REASONING_SCRATCHPAD"], + ) + def test_all_tag_variants(self, tag: str) -> None: + s = StreamingThinkScrubber() + delta = f"<{tag}>xHello" + assert _drive(s, [delta]) == "Hello" + + def test_case_insensitive_pair(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["xHello"]) == "Hello" + + +class TestUnterminatedOpen: + """Unterminated open tag discards all subsequent content to end of stream.""" + + def test_open_at_stream_start(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["reasoning text with no close"]) == "" + + def test_open_after_newline(self) -> None: + s = StreamingThinkScrubber() + # 'Hello\n' is a block boundary for the that follows + assert _drive(s, ["Hello\nreasoning"]) == "Hello\n" + + def test_open_after_newline_then_whitespace(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Hello\n reasoning"]) == "Hello\n " + + def test_prose_mentioning_tag_not_stripped(self) -> None: + """Mid-line '' in prose is preserved (no boundary).""" + s = StreamingThinkScrubber() + text = "Use the element for reasoning" + assert _drive(s, [text]) == text + + +class TestOrphanClose: + """Orphan close tags (no prior open) are stripped without boundary check.""" + + def test_orphan_close_alone(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["Helloworld"]) == "Helloworld" + + def test_orphan_close_with_trailing_space_consumed(self) -> None: + """Matches _strip_think_blocks case 3 \\s* behaviour.""" + s = StreamingThinkScrubber() + assert _drive(s, ["Hello world"]) == "Helloworld" + + def test_multiple_orphan_closes(self) -> None: + s = StreamingThinkScrubber() + assert _drive(s, ["ABC"]) == "ABC" + + +class TestPartialTagsAcrossDeltas: + """Partial tags at delta boundaries must be held back, not emitted raw.""" + + def test_split_open_tag_held_back(self) -> None: + """'<' arrives alone, 'think>' completes it on next delta.""" + s = StreamingThinkScrubber() + # At stream start, last_emitted_ended_newline=True, so at 0 is boundary + assert ( + _drive(s, ["<", "think>reasoningdone"]) + == "done" + ) + + def test_split_open_tag_not_at_boundary(self) -> None: + """Mid-line split '<' + 'think>X' is a closed pair. + + Closed pairs are always stripped (matching + ``_strip_think_blocks`` case 1), even without a block + boundary — a closed pair is an intentional bounded construct. + """ + s = StreamingThinkScrubber() + out = _drive(s, ["word<", "think>prosemore"]) + assert out == "wordmore" + + def test_split_close_tag_held_back(self) -> None: + """Close tag split across deltas still closes the block.""" + s = StreamingThinkScrubber() + assert ( + _drive(s, ["reasoning<", "/think>after"]) + == "after" + ) + + def test_split_close_tag_deep(self) -> None: + """Close tag can be split anywhere.""" + s = StreamingThinkScrubber() + assert ( + _drive(s, ["reasoningafter"]) + == "after" + ) + + +class TestTheMiniMaxScenario: + """The exact pattern run_agent per-delta regex strip breaks.""" + + def test_minimax_split_open(self) -> None: + """delta1='', delta2='Let me check', delta3='done'.""" + s = StreamingThinkScrubber() + out = _drive(s, ["", "Let me check their config", "", "done"]) + assert out == "done" + + def test_minimax_split_open_with_trailing_content(self) -> None: + """Reasoning then closes and hands off to final content.""" + s = StreamingThinkScrubber() + out = _drive( + s, + [ + "", + "The user wants to know if thinking is on", + "", + "\n\nshow_reasoning: false — thinking is OFF.", + ], + ) + assert out == "\n\nshow_reasoning: false — thinking is OFF." + + def test_minimax_unterminated_reasoning_at_end(self) -> None: + """Unclosed reasoning at stream end is dropped entirely.""" + s = StreamingThinkScrubber() + out = _drive(s, ["", "The user wants", " to know something"]) + assert out == "" + + +class TestResetAndReentry: + def test_reset_clears_in_block_state(self) -> None: + s = StreamingThinkScrubber() + s.feed("hanging") + assert s._in_block is True + s.reset() + assert s._in_block is False + # After reset, a new turn works cleanly + assert _drive(s, ["Hello world"]) == "Hello world" + + def test_reset_clears_buffered_partial_tag(self) -> None: + s = StreamingThinkScrubber() + s.feed("word<") + assert s._buf == "<" + s.reset() + assert s._buf == "" + assert _drive(s, ["fresh content"]) == "fresh content" + + +class TestFlushBehaviour: + def test_flush_drops_unterminated_block(self) -> None: + s = StreamingThinkScrubber() + assert s.feed("reasoning with no close") == "" + assert s.flush() == "" + + def test_flush_emits_innocent_partial_tag_tail(self) -> None: + """If held-back tail turned out not to be a real tag, emit it.""" + s = StreamingThinkScrubber() + s.feed("word<") # '<' could be a tag prefix + # Stream ends with only '<' held back — emit it as prose. + assert s.flush() == "<" + + def test_flush_on_empty_scrubber(self) -> None: + s = StreamingThinkScrubber() + assert s.flush() == "" + + +class TestRealisticStreaming: + """Character-by-character streaming must work as well as larger chunks.""" + + def test_char_by_char_closed_pair(self) -> None: + s = StreamingThinkScrubber() + deltas = list("xHello world") + assert _drive(s, deltas) == "Hello world" + + def test_char_by_char_orphan_close(self) -> None: + s = StreamingThinkScrubber() + deltas = list("Helloworld") + assert _drive(s, deltas) == "Helloworld" + + def test_reasoning_then_real_response_first_word_preserved(self) -> None: + """Regression: the first word of the final response must NOT be eaten. + + Stefan's screenshot bug — 'Let me check' was being rendered as + ' me check'. The scrubber must not consume any character of + post-close content. + """ + s = StreamingThinkScrubber() + deltas = [ + "", + "User wants to know things", + "", + "Let me check their config.", + ] + assert _drive(s, deltas) == "Let me check their config." + + def test_no_tag_passthrough_is_identical(self) -> None: + """Streams without any reasoning tags pass through byte-for-byte.""" + s = StreamingThinkScrubber() + deltas = ["Hello ", "world ", "how ", "are ", "you?"] + assert _drive(s, deltas) == "Hello world how are you?" diff --git a/tests/gateway/test_stale_code_self_check.py b/tests/gateway/test_stale_code_self_check.py deleted file mode 100644 index 64ad347145..0000000000 --- a/tests/gateway/test_stale_code_self_check.py +++ /dev/null @@ -1,412 +0,0 @@ -"""Tests for the gateway stale-code self-check (Issue #17648). - -A gateway that survives ``hermes update`` keeps pre-update modules cached -in ``sys.modules``. Later imports of names added post-update (e.g. -``cfg_get`` from PR #17304) raise ImportError against the stale module -object. - -The self-check compares the git HEAD SHA at boot to the current SHA on -disk. ``hermes update`` always moves HEAD forward via ``git pull``; -agent-driven file edits (Hermes editing ``run_agent.py`` / ``gateway/run.py`` -during a self-dev session) never move HEAD — so the SHA signal is free of -the false-positive class that the earlier mtime-based check suffered from. -""" - -import os -import time -from pathlib import Path - -import pytest - -from gateway.run import ( - GatewayRunner, - _compute_repo_mtime, - _read_git_head_sha, - _STALE_CODE_SENTINELS, - _GIT_SHA_CACHE_TTL_SECS, -) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _make_tmp_repo(tmp_path: Path) -> Path: - """Create a fake repo with all stale-code sentinel files.""" - for rel in _STALE_CODE_SENTINELS: - p = tmp_path / rel - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text("# test sentinel\n") - return tmp_path - - -def _make_git_repo(tmp_path: Path, sha: str = "a" * 40, branch: str = "main") -> Path: - """Stamp a minimal .git directory so _read_git_head_sha can resolve a SHA. - - We don't run real git — just lay down the files the reader walks - (.git/HEAD pointing at refs/heads/, refs/heads/ - containing the SHA). - """ - git_dir = tmp_path / ".git" - git_dir.mkdir(parents=True, exist_ok=True) - (git_dir / "HEAD").write_text(f"ref: refs/heads/{branch}\n") - refs_dir = git_dir / "refs" / "heads" - refs_dir.mkdir(parents=True, exist_ok=True) - (refs_dir / branch).write_text(f"{sha}\n") - return tmp_path - - -def _set_head_sha(repo_root: Path, sha: str, branch: str = "main") -> None: - """Rewrite the current branch ref to a new SHA (simulates git pull).""" - (repo_root / ".git" / "refs" / "heads" / branch).write_text(f"{sha}\n") - - -def _make_runner( - repo_root: Path, - *, - boot_sha: str | None, - boot_wall: float = None, - boot_mtime: float = 0.0, -): - """Bare GatewayRunner with just the stale-check attributes set.""" - if boot_wall is None: - boot_wall = time.time() - runner = object.__new__(GatewayRunner) - runner._repo_root_for_staleness = repo_root - runner._boot_wall_time = boot_wall - runner._boot_git_sha = boot_sha - runner._boot_repo_mtime = boot_mtime - runner._stale_code_notified = set() - runner._stale_code_restart_triggered = False - runner._cached_current_sha = boot_sha - runner._cached_current_sha_at = boot_wall - return runner - - -# --------------------------------------------------------------------------- -# _read_git_head_sha — raw SHA reader -# --------------------------------------------------------------------------- - -def test_read_git_head_sha_branch_ref(tmp_path): - """Resolves ref: refs/heads/ → SHA from refs/heads/.""" - sha = "b" * 40 - _make_git_repo(tmp_path, sha=sha, branch="main") - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_detached_head(tmp_path): - """Detached HEAD: .git/HEAD contains the SHA directly.""" - sha = "c" * 40 - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text(f"{sha}\n") - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_packed_refs(tmp_path): - """Falls back to packed-refs when refs/heads/ is missing.""" - sha = "d" * 40 - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text("ref: refs/heads/main\n") - # No refs/heads/main file — only packed-refs - (git_dir / "packed-refs").write_text( - f"# pack-refs with: peeled fully-peeled sorted\n" - f"{sha} refs/heads/main\n" - ) - assert _read_git_head_sha(tmp_path) == sha - - -def test_read_git_head_sha_worktree_gitdir_file(tmp_path): - """Worktree: .git is a file with `gitdir: ` pointing to the real git dir. - - Real git worktrees store shared refs (refs/heads/*) in the main - checkout's .git/ and write a ``commondir`` pointer into the - worktree-gitdir. The reader must follow commondir to resolve the - branch ref — this is the layout Hermes dev sessions actually use. - """ - sha = "e" * 40 - # Main repo layout - main_repo = tmp_path / "main-repo" - main_git = main_repo / ".git" - (main_git / "refs" / "heads").mkdir(parents=True) - (main_git / "HEAD").write_text("ref: refs/heads/main\n") - (main_git / "refs" / "heads" / "main").write_text("0" * 40 + "\n") - - # Worktree lives in main-repo/.git/worktrees// - worktree_git_dir = main_git / "worktrees" / "feature" - worktree_git_dir.mkdir(parents=True) - (worktree_git_dir / "HEAD").write_text("ref: refs/heads/feature\n") - # commondir points back at the main .git (relative path, "../..") - (worktree_git_dir / "commondir").write_text("../..\n") - # Feature branch ref lives in the shared refs/heads - (main_git / "refs" / "heads" / "feature").write_text(f"{sha}\n") - - # Worktree checkout with .git file pointing at worktree_git_dir - worktree = tmp_path / "wt" - worktree.mkdir() - (worktree / ".git").write_text(f"gitdir: {worktree_git_dir}\n") - - assert _read_git_head_sha(worktree) == sha - - -def test_read_git_head_sha_worktree_packed_refs_in_common(tmp_path): - """Worktree + packed-refs in common dir: fallback still resolves.""" - sha = "f" * 40 - main_repo = tmp_path / "main-repo" - main_git = main_repo / ".git" - main_git.mkdir(parents=True) - (main_git / "HEAD").write_text("ref: refs/heads/main\n") - # packed-refs in the common (main) .git - (main_git / "packed-refs").write_text( - f"# pack-refs with: peeled fully-peeled sorted\n" - f"{sha} refs/heads/feature\n" - ) - - worktree_git_dir = main_git / "worktrees" / "feature" - worktree_git_dir.mkdir(parents=True) - (worktree_git_dir / "HEAD").write_text("ref: refs/heads/feature\n") - (worktree_git_dir / "commondir").write_text("../..\n") - - worktree = tmp_path / "wt" - worktree.mkdir() - (worktree / ".git").write_text(f"gitdir: {worktree_git_dir}\n") - - assert _read_git_head_sha(worktree) == sha - - -def test_read_git_head_sha_no_git_returns_none(tmp_path): - """No .git dir → None (non-git install, safely disables the check).""" - assert _read_git_head_sha(tmp_path) is None - - -def test_read_git_head_sha_malformed_head_returns_none(tmp_path): - """Empty HEAD file → None (don't loop on corrupt repos).""" - git_dir = tmp_path / ".git" - git_dir.mkdir() - (git_dir / "HEAD").write_text("") - assert _read_git_head_sha(tmp_path) is None - - -# --------------------------------------------------------------------------- -# _detect_stale_code — the main regression guard -# --------------------------------------------------------------------------- - -def test_detect_stale_code_false_when_sha_unchanged(tmp_path): - """Boot SHA == current SHA → not stale (no restart).""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - # Force fresh read by expiring the cache - runner._cached_current_sha_at = 0.0 - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_true_after_git_pull(tmp_path): - """Boot SHA != current SHA → stale (hermes update happened).""" - boot_sha = "a" * 40 - _make_git_repo(tmp_path, sha=boot_sha) - runner = _make_runner(tmp_path, boot_sha=boot_sha) - # Simulate git pull moving HEAD forward - _set_head_sha(tmp_path, "b" * 40) - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is True - - -def test_detect_stale_code_ignores_agent_file_edits(tmp_path): - """THE CORE REGRESSION: agent edits to source files do NOT trigger restart. - - This is the motivating incident for the SHA-based check. Under the - previous mtime-based scheme, any ``patch`` / ``write_file`` call - against run_agent.py / gateway/run.py / hermes_cli/config.py would - flip the stale-check to True and force a gateway restart on the - next message — even though no update actually happened. SHA - comparison decouples the two: git HEAD only moves on ``git pull``, - never on file writes. - """ - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - _make_tmp_repo(tmp_path) # lay down sentinel files too - runner = _make_runner(tmp_path, boot_sha=sha) - - # Simulate the agent editing run_agent.py and gateway/run.py with - # mtimes far into the future — exactly the scenario that used to - # false-positive the old mtime check. - future = time.time() + 10_000 - for rel in _STALE_CODE_SENTINELS: - p = tmp_path / rel - if p.is_file(): - p.write_text("# agent just edited this\n") - os.utime(p, (future, future)) - - # HEAD SHA has NOT moved — check must stay False. - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_false_for_non_git_install(tmp_path): - """Non-git install (no .git dir) → check disabled, never fires.""" - # No .git dir at all; runner's boot_sha is None - runner = _make_runner(tmp_path, boot_sha=None) - # Even if we pretended the current SHA differed, the check should - # short-circuit on boot_sha=None and return False. - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_false_when_no_boot_wall_time(tmp_path): - """No boot snapshot at all → can't tell → not stale (no restart loop).""" - runner = _make_runner(tmp_path, boot_sha="a" * 40, boot_wall=0.0) - assert runner._detect_stale_code() is False - - -def test_detect_stale_code_handles_disappearing_git_dir(tmp_path): - """.git vanishes mid-run → current_sha = None → not stale (don't loop).""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - # Nuke the git dir after boot - import shutil - shutil.rmtree(tmp_path / ".git") - runner._cached_current_sha_at = 0.0 # expire cache - assert runner._detect_stale_code() is False - - -# --------------------------------------------------------------------------- -# SHA cache -# --------------------------------------------------------------------------- - -def test_current_sha_cache_collapses_bursts(tmp_path, monkeypatch): - """Consecutive calls inside the TTL window reuse the cached SHA.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - read_calls = {"n": 0} - real_reader = _read_git_head_sha - - def counting_reader(repo_root): - read_calls["n"] += 1 - return real_reader(repo_root) - - from gateway import run as run_mod - monkeypatch.setattr(run_mod, "_read_git_head_sha", counting_reader) - - # Force cache expiry so the first call definitely reads - runner._cached_current_sha_at = 0.0 - runner._current_git_sha_cached() - first_count = read_calls["n"] - - # Immediate second/third calls should hit cache (no new read) - runner._current_git_sha_cached() - runner._current_git_sha_cached() - assert read_calls["n"] == first_count - - -def test_current_sha_cache_expires_after_ttl(tmp_path, monkeypatch): - """After _GIT_SHA_CACHE_TTL_SECS elapses, a fresh read happens.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - read_calls = {"n": 0} - real_reader = _read_git_head_sha - - def counting_reader(repo_root): - read_calls["n"] += 1 - return real_reader(repo_root) - - from gateway import run as run_mod - monkeypatch.setattr(run_mod, "_read_git_head_sha", counting_reader) - - runner._cached_current_sha_at = 0.0 - runner._current_git_sha_cached() - first = read_calls["n"] - - # Age the cache past the TTL - runner._cached_current_sha_at = time.time() - (_GIT_SHA_CACHE_TTL_SECS + 1.0) - runner._current_git_sha_cached() - assert read_calls["n"] == first + 1 - - -# --------------------------------------------------------------------------- -# _trigger_stale_code_restart — idempotency preserved -# --------------------------------------------------------------------------- - -def test_trigger_stale_code_restart_is_idempotent(tmp_path): - """Calling _trigger_stale_code_restart twice only requests restart once.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - calls = [] - - def fake_request_restart(*, detached=False, via_service=False): - calls.append((detached, via_service)) - return True - - runner.request_restart = fake_request_restart - - runner._trigger_stale_code_restart() - runner._trigger_stale_code_restart() - runner._trigger_stale_code_restart() - - assert len(calls) == 1 - assert runner._stale_code_restart_triggered is True - - -def test_trigger_stale_code_restart_survives_request_failure(tmp_path): - """If request_restart raises, we swallow and mark as triggered anyway.""" - sha = "a" * 40 - _make_git_repo(tmp_path, sha=sha) - runner = _make_runner(tmp_path, boot_sha=sha) - - def boom(*, detached=False, via_service=False): - raise RuntimeError("no event loop") - - runner.request_restart = boom - - # Should not raise - runner._trigger_stale_code_restart() - - # Marked triggered so we don't retry on every subsequent message - assert runner._stale_code_restart_triggered is True - - -# --------------------------------------------------------------------------- -# Class-level defaults — tests that build bare runners via object.__new__ -# --------------------------------------------------------------------------- - -def test_class_level_defaults_prevent_uninitialized_access(): - """Partial construction via object.__new__ must not crash _detect_stale_code.""" - runner = object.__new__(GatewayRunner) - # Don't set any instance attrs — class-level defaults should kick in - runner._repo_root_for_staleness = Path(".") - # _boot_wall_time / _boot_git_sha fall through to class defaults - # (0.0 and None respectively) - assert runner._detect_stale_code() is False - # _stale_code_restart_triggered falls through to class default (False) - assert runner._stale_code_restart_triggered is False - - -# --------------------------------------------------------------------------- -# Legacy mtime reader kept for compatibility — light sanity check only -# --------------------------------------------------------------------------- - -def test_compute_repo_mtime_still_returns_newest(tmp_path): - """_compute_repo_mtime remains available for any legacy callers.""" - repo = _make_tmp_repo(tmp_path) - - baseline = time.time() - 100 - for rel in _STALE_CODE_SENTINELS: - os.utime(repo / rel, (baseline, baseline)) - - newer = time.time() - os.utime(repo / "hermes_cli/config.py", (newer, newer)) - - result = _compute_repo_mtime(repo) - assert abs(result - newer) < 1.0 - - -def test_compute_repo_mtime_missing_files_returns_zero(tmp_path): - """Legacy sanity: missing sentinels → 0.0.""" - assert _compute_repo_mtime(tmp_path) == 0.0