feat(agent): require verification before finishing edits

Make verification closure the default coding behavior after landed file edits while keeping bounded retries and config/env switches for users who need to disable it.
This commit is contained in:
Brooklyn Nicholson 2026-06-24 22:52:26 -05:00
parent f0beb6f617
commit 2f1a47b90e
8 changed files with 405 additions and 4 deletions

View file

@ -4492,9 +4492,10 @@ def run_conversation(
final_msg = agent._build_assistant_message(assistant_message, finish_reason)
# Pop thinking-only prefill and empty-response retry
# scaffolding before appending the final response. These
# internal turns are only for the next API retry and should
# not become durable transcript context.
# scaffolding before appending either a final response or a
# verification-stop follow-up. These internal turns are only
# for the next API retry and should not become durable
# transcript context.
while (
messages
and isinstance(messages[-1], dict)
@ -4506,6 +4507,44 @@ def run_conversation(
):
messages.pop()
try:
from agent.verification_stop import (
build_verify_on_stop_nudge,
verify_on_stop_enabled,
)
if verify_on_stop_enabled():
_verify_nudge = build_verify_on_stop_nudge(
session_id=getattr(agent, "session_id", None),
changed_paths=getattr(agent, "_turn_file_mutation_paths", set()),
attempts=getattr(agent, "_verification_stop_nudges", 0),
)
else:
_verify_nudge = None
except Exception:
logger.debug("verification stop-loop check failed", exc_info=True)
_verify_nudge = None
if _verify_nudge:
agent._verification_stop_nudges = (
getattr(agent, "_verification_stop_nudges", 0) + 1
)
final_msg["finish_reason"] = "verification_required"
messages.append(final_msg)
# Keep the attempted final answer in model history so the
# synthetic user nudge preserves role alternation, but do
# not surface it to the user as an interim answer. The
# whole point of this guard is to prevent premature
# "done" claims before checks run.
messages.append({
"role": "user",
"content": _verify_nudge,
"_verification_stop_synthetic": True,
})
agent._session_messages = messages
agent._emit_status("↻ Verification required before finishing")
continue
messages.append(final_msg)
_turn_exit_reason = f"text_response(finish_reason={finish_reason})"

View file

@ -11,7 +11,8 @@ Pure module-level utilities extracted from ``run_agent.py``:
``_append_subdir_hint_to_multimodal`` envelope helpers for the
``{"_multimodal": True, "content": [...], "text_summary": ...}`` dict
shape returned by tools like ``computer_use``.
* ``_extract_file_mutation_targets`` / ``_extract_error_preview``
* ``_extract_file_mutation_targets`` / ``_extract_landed_file_mutation_paths`` /
``_extract_error_preview``
per-turn file-mutation verifier inputs.
* ``_trajectory_normalize_msg`` strip image blobs from a message for
trajectory saving.
@ -269,6 +270,35 @@ def _extract_file_mutation_targets(tool_name: str, args: Dict[str, Any]) -> List
return []
def _extract_landed_file_mutation_paths(
tool_name: str,
args: Dict[str, Any],
result: Any,
) -> List[str]:
"""Return the concrete file paths a successful mutation reports."""
targets = _extract_file_mutation_targets(tool_name, args)
if tool_name not in _FILE_MUTATING_TOOLS or not isinstance(result, str):
return targets
try:
data = json.loads(result.strip())
except Exception:
return targets
if not isinstance(data, dict):
return targets
files = data.get("files_modified")
if isinstance(files, list):
landed = [str(p) for p in files if p]
if landed:
return landed
resolved = data.get("resolved_path")
if resolved:
return [str(resolved)]
return targets
def _extract_error_preview(result: Any, max_len: int = 180) -> str:
"""Pull a one-line error summary out of a tool result for footer display."""
text = _multimodal_text_summary(result) if result is not None else ""
@ -411,6 +441,7 @@ __all__ = [
"_multimodal_text_summary",
"_append_subdir_hint_to_multimodal",
"_extract_file_mutation_targets",
"_extract_landed_file_mutation_paths",
"_extract_error_preview",
"_trajectory_normalize_msg",
"make_tool_result_message",

View file

@ -427,6 +427,8 @@ def build_turn_context(
# Per-turn file-mutation verifier state.
agent._turn_failed_file_mutations = {}
agent._turn_file_mutation_paths = set()
agent._verification_stop_nudges = 0
# Record the execution thread so interrupt()/clear_interrupt() can scope
# the tool-level interrupt signal to THIS agent's thread only.

153
agent/verification_stop.py Normal file
View file

@ -0,0 +1,153 @@
"""Turn-end verification guard for coding edits.
This module is intentionally policy-only. It never runs checks itself; it turns
the passive verification ledger into a bounded follow-up when the model tries to
finish immediately after editing code without fresh evidence.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Iterable
_MAX_CHANGED_PATHS_IN_NUDGE = 8
def verify_on_stop_enabled(config: dict[str, Any] | None = None) -> bool:
"""Return whether edit -> verify-before-finish behavior is enabled."""
env = os.environ.get("HERMES_VERIFY_ON_STOP")
if env is not None:
return env.strip().lower() not in {"0", "false", "no", "off"}
if config is None:
try:
from hermes_cli.config import load_config
config = load_config()
except Exception:
config = {}
agent_cfg = (config or {}).get("agent") if isinstance(config, dict) else None
if isinstance(agent_cfg, dict) and "verify_on_stop" in agent_cfg:
return bool(agent_cfg.get("verify_on_stop"))
return True
def _candidate_cwds(paths: Iterable[str]) -> list[Path]:
candidates: list[Path] = []
seen: set[str] = set()
for raw in paths:
if not raw:
continue
try:
path = Path(raw).expanduser()
candidate = path if path.is_dir() else path.parent
resolved = str(candidate.resolve())
except Exception:
continue
if resolved not in seen:
seen.add(resolved)
candidates.append(Path(resolved))
return candidates
def _verification_snapshot(
*,
session_id: str | None,
changed_paths: list[str],
) -> tuple[dict[str, Any], dict[str, Any]] | None:
"""Return ``(status, facts)`` for the first edited workspace needing proof."""
try:
from agent.coding_context import project_facts_for
from agent.verification_evidence import verification_status
except Exception:
return None
first_snapshot: tuple[dict[str, Any], dict[str, Any]] | None = None
for cwd in _candidate_cwds(changed_paths):
facts = project_facts_for(cwd)
if not facts:
continue
status = verification_status(session_id=session_id, cwd=cwd)
snapshot = (status, facts)
if first_snapshot is None:
first_snapshot = snapshot
if str(status.get("status") or "unverified") != "passed":
return snapshot
return first_snapshot
def _format_changed_paths(paths: list[str]) -> str:
shown = paths[:_MAX_CHANGED_PATHS_IN_NUDGE]
lines = [f"- `{path}`" for path in shown]
remaining = len(paths) - len(shown)
if remaining > 0:
lines.append(f"- ... and {remaining} more")
return "\n".join(lines)
def _status_detail(status: dict[str, Any]) -> str:
state = str(status.get("status") or "unverified")
evidence = status.get("evidence") if isinstance(status.get("evidence"), dict) else None
if not evidence:
return state
command = evidence.get("canonical_command") or evidence.get("command")
summary = str(evidence.get("output_summary") or "").strip()
parts = [state]
if command:
parts.append(f"last command `{command}`")
if summary:
max_summary = 1200
if len(summary) > max_summary:
summary = summary[:max_summary].rstrip() + "\n... [truncated]"
parts.append(f"last output:\n{summary}")
return "\n".join(parts)
def build_verify_on_stop_nudge(
*,
session_id: str | None,
changed_paths: Iterable[str],
attempts: int = 0,
max_attempts: int = 2,
) -> str | None:
"""Return a synthetic follow-up when edited code lacks fresh verification."""
paths = sorted({str(p) for p in changed_paths if p})
if not paths or attempts >= max_attempts:
return None
snapshot = _verification_snapshot(session_id=session_id, changed_paths=paths)
if snapshot is None:
return None
status, facts = snapshot
verify_commands = [
str(cmd).strip()
for cmd in (facts.get("verifyCommands") or [])
if str(cmd).strip()
]
if not verify_commands:
return None
state = str(status.get("status") or "unverified")
if state == "passed":
return None
command_hint = ", ".join(f"`{cmd}`" for cmd in verify_commands[:3])
if len(verify_commands) > 3:
command_hint += ", ..."
return (
"[System: You edited code in this turn, but the workspace does not have "
"fresh passing verification evidence yet.\n\n"
f"Verification status: {_status_detail(status)}\n\n"
f"Changed paths:\n{_format_changed_paths(paths)}\n\n"
f"Run the relevant verification command now ({command_hint}), read any "
"failure, repair the code, and summarize what passed. If verification "
"is not possible, explain the concrete blocker instead of claiming the "
"work is fully verified.]"
)
__all__ = ["build_verify_on_stop_nudge", "verify_on_stop_enabled"]

View file

@ -979,6 +979,11 @@ DEFAULT_CONFIG = {
# "on" — force the prompt posture everywhere.
# "off" — disable entirely.
"coding_context": "auto",
# Verification closure: after the agent edits files in a code workspace,
# do not accept a final answer until fresh verification evidence exists
# or the agent explains why it cannot run checks. The loop is bounded
# and uses the passive verification ledger. Set false to disable.
"verify_on_stop": True,
# Staged inactivity warning: send a warning to the user at this
# threshold before escalating to a full timeout. The warning fires
# once per run and does not interrupt the agent. 0 = disable warning.

View file

@ -206,6 +206,7 @@ from agent.tool_dispatch_helpers import (
_multimodal_text_summary,
_append_subdir_hint_to_multimodal, # noqa: F401 # re-exported for tests that `from run_agent import _append_subdir_hint_to_multimodal`
_extract_file_mutation_targets,
_extract_landed_file_mutation_paths,
_extract_error_preview,
_trajectory_normalize_msg, # noqa: F401 # re-exported for tests that `from run_agent import _trajectory_normalize_msg`
)
@ -2550,6 +2551,10 @@ class AIAgent:
if not targets:
return
landed = file_mutation_result_landed(tool_name, result)
if landed:
changed = getattr(self, "_turn_file_mutation_paths", None)
if changed is not None:
changed.update(_extract_landed_file_mutation_paths(tool_name, args, result))
if is_error and not landed:
preview = _extract_error_preview(result)
for path in targets:

View file

@ -0,0 +1,139 @@
import json
from pathlib import Path
from agent.verification_evidence import (
mark_workspace_edited,
record_terminal_result,
)
from agent.verification_stop import (
build_verify_on_stop_nudge,
verify_on_stop_enabled,
)
def _node_project(root: Path) -> None:
(root / "package.json").write_text(
json.dumps({"scripts": {"test": "vitest", "lint": "eslint ."}}),
encoding="utf-8",
)
(root / "pnpm-lock.yaml").write_text("", encoding="utf-8")
def _make_project(root: Path) -> None:
root.mkdir()
_node_project(root)
def test_verify_on_stop_default_is_on(monkeypatch):
monkeypatch.delenv("HERMES_VERIFY_ON_STOP", raising=False)
assert verify_on_stop_enabled({"agent": {}}) is True
def test_verify_on_stop_env_can_disable(monkeypatch):
monkeypatch.setenv("HERMES_VERIFY_ON_STOP", "0")
assert verify_on_stop_enabled({"agent": {"verify_on_stop": True}}) is False
def test_verify_on_stop_config_can_disable(monkeypatch):
monkeypatch.delenv("HERMES_VERIFY_ON_STOP", raising=False)
assert verify_on_stop_enabled({"agent": {"verify_on_stop": False}}) is False
def test_no_nudge_after_fresh_pass(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
changed = str(tmp_path / "src" / "app.ts")
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="s1",
exit_code=0,
output="green",
)
assert build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) is None
def test_nudge_checks_all_edited_workspaces(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
project_a = tmp_path / "a"
project_b = tmp_path / "b"
_make_project(project_a)
_make_project(project_b)
changed_a = str(project_a / "src" / "app.ts")
changed_b = str(project_b / "src" / "app.ts")
record_terminal_result(
command="pnpm test",
cwd=project_a,
session_id="s1",
exit_code=0,
output="green",
)
mark_workspace_edited(session_id="s1", cwd=project_b, paths=[changed_b])
nudge = build_verify_on_stop_nudge(
session_id="s1",
changed_paths=[changed_a, changed_b],
)
assert nudge is not None
assert "fresh passing verification evidence" in nudge
def test_nudge_after_unverified_edit_with_known_command(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
changed = str(tmp_path / "src" / "app.ts")
mark_workspace_edited(session_id="s1", cwd=tmp_path, paths=[changed])
nudge = build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed])
assert nudge is not None
assert "fresh passing verification evidence" in nudge
assert "`pnpm run test`" in nudge
assert changed in nudge
def test_nudge_includes_failed_output_summary(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
changed = str(tmp_path / "src" / "app.ts")
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="s1",
exit_code=1,
output="expected 1 got 2",
)
nudge = build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed])
assert nudge is not None
assert "failed" in nudge
assert "expected 1 got 2" in nudge
assert "repair the code" in nudge
def test_no_nudge_without_canonical_verify_command(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
(tmp_path / "package.json").write_text("{}", encoding="utf-8")
changed = str(tmp_path / "src" / "app.ts")
assert build_verify_on_stop_nudge(session_id="s1", changed_paths=[changed]) is None
def test_nudge_attempts_are_bounded(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
changed = str(tmp_path / "src" / "app.ts")
mark_workspace_edited(session_id="s1", cwd=tmp_path, paths=[changed])
assert build_verify_on_stop_nudge(
session_id="s1",
changed_paths=[changed],
attempts=2,
max_attempts=2,
) is None

View file

@ -28,6 +28,7 @@ from run_agent import (
_FILE_MUTATING_TOOLS,
_extract_error_preview,
_extract_file_mutation_targets,
_extract_landed_file_mutation_paths,
)
@ -128,6 +129,7 @@ def _bare_agent() -> AIAgent:
"""
agent = object.__new__(AIAgent)
agent._turn_failed_file_mutations = {}
agent._turn_file_mutation_paths = set()
return agent
@ -165,6 +167,31 @@ class TestRecordFileMutationResult:
json.dumps({"success": True, "diff": "..."}), is_error=False,
)
assert agent._turn_failed_file_mutations == {}
assert agent._turn_file_mutation_paths == {"/tmp/a.md"}
def test_success_records_landed_paths_for_verify_on_stop(self):
agent = _bare_agent()
agent._record_file_mutation_result(
"write_file",
{"path": "a.py", "content": "print('ok')\n"},
json.dumps({"bytes_written": 12, "files_modified": ["/tmp/project/a.py"]}),
is_error=False,
)
assert agent._turn_file_mutation_paths == {"/tmp/project/a.py"}
def test_landed_paths_prefer_resolved_tool_result(self):
paths = _extract_landed_file_mutation_paths(
"patch",
{"mode": "replace", "path": "src/app.py"},
json.dumps({
"success": True,
"files_modified": ["/tmp/project/src/app.py"],
}),
)
assert paths == ["/tmp/project/src/app.py"]
def test_write_file_with_lint_error_counts_as_landed(self):
agent = _bare_agent()