From 2f1a47b90e60914b61d361f7c4147a11e7ebebbc Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Wed, 24 Jun 2026 22:52:26 -0500 Subject: [PATCH] feat(agent): require verification before finishing edits Make verification closure the default coding behavior after landed file edits while keeping bounded retries and config/env switches for users who need to disable it. --- agent/conversation_loop.py | 45 +++++- agent/tool_dispatch_helpers.py | 33 +++- agent/turn_context.py | 2 + agent/verification_stop.py | 153 ++++++++++++++++++ hermes_cli/config.py | 5 + run_agent.py | 5 + tests/agent/test_verification_stop.py | 139 ++++++++++++++++ .../run_agent/test_file_mutation_verifier.py | 27 ++++ 8 files changed, 405 insertions(+), 4 deletions(-) create mode 100644 agent/verification_stop.py create mode 100644 tests/agent/test_verification_stop.py diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 303752aa427..b647e39c980 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -4492,9 +4492,10 @@ def run_conversation( final_msg = agent._build_assistant_message(assistant_message, finish_reason) # Pop thinking-only prefill and empty-response retry - # scaffolding before appending the final response. These - # internal turns are only for the next API retry and should - # not become durable transcript context. + # scaffolding before appending either a final response or a + # verification-stop follow-up. These internal turns are only + # for the next API retry and should not become durable + # transcript context. while ( messages and isinstance(messages[-1], dict) @@ -4506,6 +4507,44 @@ def run_conversation( ): messages.pop() + try: + from agent.verification_stop import ( + build_verify_on_stop_nudge, + verify_on_stop_enabled, + ) + + if verify_on_stop_enabled(): + _verify_nudge = build_verify_on_stop_nudge( + session_id=getattr(agent, "session_id", None), + changed_paths=getattr(agent, "_turn_file_mutation_paths", set()), + attempts=getattr(agent, "_verification_stop_nudges", 0), + ) + else: + _verify_nudge = None + except Exception: + logger.debug("verification stop-loop check failed", exc_info=True) + _verify_nudge = None + + if _verify_nudge: + agent._verification_stop_nudges = ( + getattr(agent, "_verification_stop_nudges", 0) + 1 + ) + final_msg["finish_reason"] = "verification_required" + messages.append(final_msg) + # Keep the attempted final answer in model history so the + # synthetic user nudge preserves role alternation, but do + # not surface it to the user as an interim answer. The + # whole point of this guard is to prevent premature + # "done" claims before checks run. + messages.append({ + "role": "user", + "content": _verify_nudge, + "_verification_stop_synthetic": True, + }) + agent._session_messages = messages + agent._emit_status("↻ Verification required before finishing") + continue + messages.append(final_msg) _turn_exit_reason = f"text_response(finish_reason={finish_reason})" diff --git a/agent/tool_dispatch_helpers.py b/agent/tool_dispatch_helpers.py index a0f3bfc2683..2cdcff7d714 100644 --- a/agent/tool_dispatch_helpers.py +++ b/agent/tool_dispatch_helpers.py @@ -11,7 +11,8 @@ Pure module-level utilities extracted from ``run_agent.py``: ``_append_subdir_hint_to_multimodal`` — envelope helpers for the ``{"_multimodal": True, "content": [...], "text_summary": ...}`` dict shape returned by tools like ``computer_use``. -* ``_extract_file_mutation_targets`` / ``_extract_error_preview`` — +* ``_extract_file_mutation_targets`` / ``_extract_landed_file_mutation_paths`` / + ``_extract_error_preview`` — per-turn file-mutation verifier inputs. * ``_trajectory_normalize_msg`` — strip image blobs from a message for trajectory saving. @@ -269,6 +270,35 @@ def _extract_file_mutation_targets(tool_name: str, args: Dict[str, Any]) -> List return [] +def _extract_landed_file_mutation_paths( + tool_name: str, + args: Dict[str, Any], + result: Any, +) -> List[str]: + """Return the concrete file paths a successful mutation reports.""" + targets = _extract_file_mutation_targets(tool_name, args) + if tool_name not in _FILE_MUTATING_TOOLS or not isinstance(result, str): + return targets + try: + data = json.loads(result.strip()) + except Exception: + return targets + if not isinstance(data, dict): + return targets + + files = data.get("files_modified") + if isinstance(files, list): + landed = [str(p) for p in files if p] + if landed: + return landed + + resolved = data.get("resolved_path") + if resolved: + return [str(resolved)] + + return targets + + def _extract_error_preview(result: Any, max_len: int = 180) -> str: """Pull a one-line error summary out of a tool result for footer display.""" text = _multimodal_text_summary(result) if result is not None else "" @@ -411,6 +441,7 @@ __all__ = [ "_multimodal_text_summary", "_append_subdir_hint_to_multimodal", "_extract_file_mutation_targets", + "_extract_landed_file_mutation_paths", "_extract_error_preview", "_trajectory_normalize_msg", "make_tool_result_message", diff --git a/agent/turn_context.py b/agent/turn_context.py index 8b81a45bd22..d7f376d1c4a 100644 --- a/agent/turn_context.py +++ b/agent/turn_context.py @@ -427,6 +427,8 @@ def build_turn_context( # Per-turn file-mutation verifier state. agent._turn_failed_file_mutations = {} + agent._turn_file_mutation_paths = set() + agent._verification_stop_nudges = 0 # Record the execution thread so interrupt()/clear_interrupt() can scope # the tool-level interrupt signal to THIS agent's thread only. diff --git a/agent/verification_stop.py b/agent/verification_stop.py new file mode 100644 index 00000000000..80cb4aa9a3e --- /dev/null +++ b/agent/verification_stop.py @@ -0,0 +1,153 @@ +"""Turn-end verification guard for coding edits. + +This module is intentionally policy-only. It never runs checks itself; it turns +the passive verification ledger into a bounded follow-up when the model tries to +finish immediately after editing code without fresh evidence. +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any, Iterable + + +_MAX_CHANGED_PATHS_IN_NUDGE = 8 + + +def verify_on_stop_enabled(config: dict[str, Any] | None = None) -> bool: + """Return whether edit -> verify-before-finish behavior is enabled.""" + env = os.environ.get("HERMES_VERIFY_ON_STOP") + if env is not None: + return env.strip().lower() not in {"0", "false", "no", "off"} + if config is None: + try: + from hermes_cli.config import load_config + + config = load_config() + except Exception: + config = {} + agent_cfg = (config or {}).get("agent") if isinstance(config, dict) else None + if isinstance(agent_cfg, dict) and "verify_on_stop" in agent_cfg: + return bool(agent_cfg.get("verify_on_stop")) + return True + + +def _candidate_cwds(paths: Iterable[str]) -> list[Path]: + candidates: list[Path] = [] + seen: set[str] = set() + for raw in paths: + if not raw: + continue + try: + path = Path(raw).expanduser() + candidate = path if path.is_dir() else path.parent + resolved = str(candidate.resolve()) + except Exception: + continue + if resolved not in seen: + seen.add(resolved) + candidates.append(Path(resolved)) + return candidates + + +def _verification_snapshot( + *, + session_id: str | None, + changed_paths: list[str], +) -> tuple[dict[str, Any], dict[str, Any]] | None: + """Return ``(status, facts)`` for the first edited workspace needing proof.""" + try: + from agent.coding_context import project_facts_for + from agent.verification_evidence import verification_status + except Exception: + return None + + first_snapshot: tuple[dict[str, Any], dict[str, Any]] | None = None + for cwd in _candidate_cwds(changed_paths): + facts = project_facts_for(cwd) + if not facts: + continue + status = verification_status(session_id=session_id, cwd=cwd) + snapshot = (status, facts) + if first_snapshot is None: + first_snapshot = snapshot + if str(status.get("status") or "unverified") != "passed": + return snapshot + return first_snapshot + + +def _format_changed_paths(paths: list[str]) -> str: + shown = paths[:_MAX_CHANGED_PATHS_IN_NUDGE] + lines = [f"- `{path}`" for path in shown] + remaining = len(paths) - len(shown) + if remaining > 0: + lines.append(f"- ... and {remaining} more") + return "\n".join(lines) + + +def _status_detail(status: dict[str, Any]) -> str: + state = str(status.get("status") or "unverified") + evidence = status.get("evidence") if isinstance(status.get("evidence"), dict) else None + if not evidence: + return state + + command = evidence.get("canonical_command") or evidence.get("command") + summary = str(evidence.get("output_summary") or "").strip() + parts = [state] + if command: + parts.append(f"last command `{command}`") + if summary: + max_summary = 1200 + if len(summary) > max_summary: + summary = summary[:max_summary].rstrip() + "\n... [truncated]" + parts.append(f"last output:\n{summary}") + return "\n".join(parts) + + +def build_verify_on_stop_nudge( + *, + session_id: str | None, + changed_paths: Iterable[str], + attempts: int = 0, + max_attempts: int = 2, +) -> str | None: + """Return a synthetic follow-up when edited code lacks fresh verification.""" + paths = sorted({str(p) for p in changed_paths if p}) + if not paths or attempts >= max_attempts: + return None + + snapshot = _verification_snapshot(session_id=session_id, changed_paths=paths) + if snapshot is None: + return None + status, facts = snapshot + + verify_commands = [ + str(cmd).strip() + for cmd in (facts.get("verifyCommands") or []) + if str(cmd).strip() + ] + if not verify_commands: + return None + + state = str(status.get("status") or "unverified") + if state == "passed": + return None + + command_hint = ", ".join(f"`{cmd}`" for cmd in verify_commands[:3]) + if len(verify_commands) > 3: + command_hint += ", ..." + + return ( + "[System: You edited code in this turn, but the workspace does not have " + "fresh passing verification evidence yet.\n\n" + f"Verification status: {_status_detail(status)}\n\n" + f"Changed paths:\n{_format_changed_paths(paths)}\n\n" + f"Run the relevant verification command now ({command_hint}), read any " + "failure, repair the code, and summarize what passed. If verification " + "is not possible, explain the concrete blocker instead of claiming the " + "work is fully verified.]" + ) + + +__all__ = ["build_verify_on_stop_nudge", "verify_on_stop_enabled"] diff --git a/hermes_cli/config.py b/hermes_cli/config.py index d250f25bd0d..578e9302078 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -979,6 +979,11 @@ DEFAULT_CONFIG = { # "on" — force the prompt posture everywhere. # "off" — disable entirely. "coding_context": "auto", + # Verification closure: after the agent edits files in a code workspace, + # do not accept a final answer until fresh verification evidence exists + # or the agent explains why it cannot run checks. The loop is bounded + # and uses the passive verification ledger. Set false to disable. + "verify_on_stop": True, # Staged inactivity warning: send a warning to the user at this # threshold before escalating to a full timeout. The warning fires # once per run and does not interrupt the agent. 0 = disable warning. diff --git a/run_agent.py b/run_agent.py index 63050980934..8432b0b3db1 100644 --- a/run_agent.py +++ b/run_agent.py @@ -206,6 +206,7 @@ from agent.tool_dispatch_helpers import ( _multimodal_text_summary, _append_subdir_hint_to_multimodal, # noqa: F401 # re-exported for tests that `from run_agent import _append_subdir_hint_to_multimodal` _extract_file_mutation_targets, + _extract_landed_file_mutation_paths, _extract_error_preview, _trajectory_normalize_msg, # noqa: F401 # re-exported for tests that `from run_agent import _trajectory_normalize_msg` ) @@ -2550,6 +2551,10 @@ class AIAgent: if not targets: return landed = file_mutation_result_landed(tool_name, result) + if landed: + changed = getattr(self, "_turn_file_mutation_paths", None) + if changed is not None: + changed.update(_extract_landed_file_mutation_paths(tool_name, args, result)) if is_error and not landed: preview = _extract_error_preview(result) for path in targets: diff --git a/tests/agent/test_verification_stop.py b/tests/agent/test_verification_stop.py new file mode 100644 index 00000000000..a325a434eb1 --- /dev/null +++ b/tests/agent/test_verification_stop.py @@ -0,0 +1,139 @@ +import json +from pathlib import Path + +from agent.verification_evidence import ( + mark_workspace_edited, + record_terminal_result, +) +from agent.verification_stop import ( + build_verify_on_stop_nudge, + verify_on_stop_enabled, +) + + +def _node_project(root: Path) -> None: + (root / "package.json").write_text( + json.dumps({"scripts": {"test": "vitest", "lint": "eslint ."}}), + encoding="utf-8", + ) + (root / "pnpm-lock.yaml").write_text("", encoding="utf-8") + + +def _make_project(root: Path) -> None: + root.mkdir() + _node_project(root) + + +def test_verify_on_stop_default_is_on(monkeypatch): + monkeypatch.delenv("HERMES_VERIFY_ON_STOP", raising=False) + assert verify_on_stop_enabled({"agent": {}}) is True + + +def test_verify_on_stop_env_can_disable(monkeypatch): + monkeypatch.setenv("HERMES_VERIFY_ON_STOP", "0") + assert verify_on_stop_enabled({"agent": {"verify_on_stop": True}}) is False + + +def test_verify_on_stop_config_can_disable(monkeypatch): + monkeypatch.delenv("HERMES_VERIFY_ON_STOP", raising=False) + assert verify_on_stop_enabled({"agent": {"verify_on_stop": False}}) is False + + +def test_no_nudge_after_fresh_pass(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + _node_project(tmp_path) + changed = str(tmp_path / "src" / "app.ts") + + record_terminal_result( + command="pnpm test", + cwd=tmp_path, + session_id="s1", + exit_code=0, + output="green", + ) + + assert build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) is None + + +def test_nudge_checks_all_edited_workspaces(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + project_a = tmp_path / "a" + project_b = tmp_path / "b" + _make_project(project_a) + _make_project(project_b) + changed_a = str(project_a / "src" / "app.ts") + changed_b = str(project_b / "src" / "app.ts") + + record_terminal_result( + command="pnpm test", + cwd=project_a, + session_id="s1", + exit_code=0, + output="green", + ) + mark_workspace_edited(session_id="s1", cwd=project_b, paths=[changed_b]) + + nudge = build_verify_on_stop_nudge( + session_id="s1", + changed_paths=[changed_a, changed_b], + ) + + assert nudge is not None + assert "fresh passing verification evidence" in nudge + + +def test_nudge_after_unverified_edit_with_known_command(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + _node_project(tmp_path) + changed = str(tmp_path / "src" / "app.ts") + mark_workspace_edited(session_id="s1", cwd=tmp_path, paths=[changed]) + + nudge = build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) + + assert nudge is not None + assert "fresh passing verification evidence" in nudge + assert "`pnpm run test`" in nudge + assert changed in nudge + + +def test_nudge_includes_failed_output_summary(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + _node_project(tmp_path) + changed = str(tmp_path / "src" / "app.ts") + + record_terminal_result( + command="pnpm test", + cwd=tmp_path, + session_id="s1", + exit_code=1, + output="expected 1 got 2", + ) + + nudge = build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) + + assert nudge is not None + assert "failed" in nudge + assert "expected 1 got 2" in nudge + assert "repair the code" in nudge + + +def test_no_nudge_without_canonical_verify_command(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + (tmp_path / "package.json").write_text("{}", encoding="utf-8") + changed = str(tmp_path / "src" / "app.ts") + + assert build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) is None + + +def test_nudge_attempts_are_bounded(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + _node_project(tmp_path) + changed = str(tmp_path / "src" / "app.ts") + mark_workspace_edited(session_id="s1", cwd=tmp_path, paths=[changed]) + + assert build_verify_on_stop_nudge( + session_id="s1", + changed_paths=[changed], + attempts=2, + max_attempts=2, + ) is None diff --git a/tests/run_agent/test_file_mutation_verifier.py b/tests/run_agent/test_file_mutation_verifier.py index 5d6b9d7c0dc..578c846e658 100644 --- a/tests/run_agent/test_file_mutation_verifier.py +++ b/tests/run_agent/test_file_mutation_verifier.py @@ -28,6 +28,7 @@ from run_agent import ( _FILE_MUTATING_TOOLS, _extract_error_preview, _extract_file_mutation_targets, + _extract_landed_file_mutation_paths, ) @@ -128,6 +129,7 @@ def _bare_agent() -> AIAgent: """ agent = object.__new__(AIAgent) agent._turn_failed_file_mutations = {} + agent._turn_file_mutation_paths = set() return agent @@ -165,6 +167,31 @@ class TestRecordFileMutationResult: json.dumps({"success": True, "diff": "..."}), is_error=False, ) assert agent._turn_failed_file_mutations == {} + assert agent._turn_file_mutation_paths == {"/tmp/a.md"} + + def test_success_records_landed_paths_for_verify_on_stop(self): + agent = _bare_agent() + + agent._record_file_mutation_result( + "write_file", + {"path": "a.py", "content": "print('ok')\n"}, + json.dumps({"bytes_written": 12, "files_modified": ["/tmp/project/a.py"]}), + is_error=False, + ) + + assert agent._turn_file_mutation_paths == {"/tmp/project/a.py"} + + def test_landed_paths_prefer_resolved_tool_result(self): + paths = _extract_landed_file_mutation_paths( + "patch", + {"mode": "replace", "path": "src/app.py"}, + json.dumps({ + "success": True, + "files_modified": ["/tmp/project/src/app.py"], + }), + ) + + assert paths == ["/tmp/project/src/app.py"] def test_write_file_with_lint_error_counts_as_landed(self): agent = _bare_agent()