From fcbdf3c3568bc0563c0af5b444c6e5bd6842bf02 Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Wed, 24 Jun 2026 22:35:27 -0500 Subject: [PATCH] feat(agent): record coding verification evidence Record foreground verification commands in a bounded, profile-scoped ledger and mark evidence stale when code edits change the workspace. --- agent/verification_evidence.py | 547 +++++++++++++++++++++++++++++++++ tools/file_tools.py | 46 ++- tools/terminal_tool.py | 34 +- 3 files changed, 620 insertions(+), 7 deletions(-) create mode 100644 agent/verification_evidence.py diff --git a/agent/verification_evidence.py b/agent/verification_evidence.py new file mode 100644 index 00000000000..f86b115c060 --- /dev/null +++ b/agent/verification_evidence.py @@ -0,0 +1,547 @@ +"""Coding verification evidence ledger. + +This module records what the agent actually proved while working in a code +workspace. It is deliberately passive: it never decides to run a suite, never +blocks completion, and never upgrades targeted checks into "repo green". +""" + +from __future__ import annotations + +import json +import re +import shlex +import sqlite3 +import threading +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Optional + +from hermes_constants import get_hermes_home + + +_DB_LOCK = threading.Lock() +_MAX_OUTPUT_SUMMARY_CHARS = 2000 +_MAX_EVIDENCE_AGE_DAYS = 30 +_MAX_EVENTS_PER_SESSION_ROOT = 100 +_MAX_TOTAL_UNREFERENCED_EVENTS = 10_000 +_VERIFY_SCHEMA_VERSION = 1 +_SHELL_SPLIT_RE = re.compile(r"\s*(?:&&|\|\||;)\s*") + + +@dataclass(frozen=True) +class VerificationEvidence: + """A classified command result worth recording.""" + + command: str + canonical_command: str + kind: str + scope: str + status: str + exit_code: int + cwd: str + root: str + session_id: str + output_summary: str = "" + + +def _utc_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _retention_cutoff() -> str: + return (datetime.now(timezone.utc) - timedelta(days=_MAX_EVIDENCE_AGE_DAYS)).isoformat() + + +def _db_path() -> Path: + return get_hermes_home() / "verification_evidence.db" + + +def _connect() -> sqlite3.Connection: + path = _db_path() + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=5000") + conn.row_factory = sqlite3.Row + _ensure_schema(conn) + return conn + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS verification_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at TEXT NOT NULL, + session_id TEXT NOT NULL, + cwd TEXT NOT NULL, + root TEXT NOT NULL, + command TEXT NOT NULL, + canonical_command TEXT NOT NULL, + kind TEXT NOT NULL, + scope TEXT NOT NULL, + status TEXT NOT NULL, + exit_code INTEGER NOT NULL, + output_summary TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS verification_state ( + session_id TEXT NOT NULL, + root TEXT NOT NULL, + last_event_id INTEGER, + last_edit_at TEXT, + changed_paths_json TEXT NOT NULL DEFAULT '[]', + PRIMARY KEY (session_id, root) + ) + """ + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_verification_events_session_root + ON verification_events(session_id, root, id DESC) + """ + ) + conn.execute( + "INSERT OR REPLACE INTO meta(key, value) VALUES ('schema_version', ?)", + (str(_VERIFY_SCHEMA_VERSION),), + ) + conn.commit() + + +def _split_segment_tokens(command: str) -> list[list[str]]: + segments: list[list[str]] = [] + for segment in _SHELL_SPLIT_RE.split(command.strip()): + if not segment: + continue + try: + tokens = shlex.split(segment) + except ValueError: + continue + if tokens: + segments.append(tokens) + return segments + + +def _clean_token(token: str) -> str: + token = token.strip() + while token.startswith("./"): + token = token[2:] + return token + + +def _canonical_tokens(canonical: str) -> list[str]: + try: + return [_clean_token(t) for t in shlex.split(canonical) if t] + except ValueError: + return [] + + +def _find_subsequence(tokens: list[str], needle: list[str]) -> Optional[int]: + if not tokens or not needle or len(needle) > len(tokens): + return None + cleaned = [_clean_token(t) for t in tokens] + for idx in range(0, len(cleaned) - len(needle) + 1): + if cleaned[idx:idx + len(needle)] == needle: + return idx + return None + + +def _strip_command_prefix(tokens: list[str]) -> list[str]: + """Remove harmless command prefixes before matching canonical commands.""" + remaining = list(tokens) + if remaining and remaining[0] == "env": + remaining = remaining[1:] + while remaining and "=" in remaining[0] and not remaining[0].startswith("-"): + remaining = remaining[1:] + while remaining and remaining[0] in {"command", "time", "noglob"}: + remaining = remaining[1:] + return remaining + + +def _equivalent_needles(needle: list[str]) -> list[list[str]]: + """Return command spellings equivalent to the detected canonical command.""" + candidates = [needle] + if len(needle) >= 3 and needle[1] == "run": + package_manager = needle[0] + script_name = needle[2] + if package_manager in {"npm", "pnpm", "yarn", "bun"}: + candidates.append([package_manager, script_name]) + if len(needle) == 1 and "/" in needle[0]: + candidates.extend([["bash", needle[0]], ["sh", needle[0]]]) + if needle == ["pytest"]: + candidates.extend( + [ + ["python", "-m", "pytest"], + ["python3", "-m", "pytest"], + ["uv", "run", "pytest"], + ["poetry", "run", "pytest"], + ["pipenv", "run", "pytest"], + ] + ) + return candidates + + +def _find_canonical_match(command: str, canonical_commands: list[str]) -> Optional[tuple[str, list[str]]]: + """Return ``(canonical, trailing_args)`` for the first detected command.""" + + segments = _split_segment_tokens(command) + for canonical in canonical_commands: + needle = _canonical_tokens(canonical) + if not needle: + continue + for tokens in segments: + candidate_tokens = _strip_command_prefix(tokens) + for candidate in _equivalent_needles(needle): + if candidate_tokens[:len(candidate)] == candidate: + return canonical, candidate_tokens[len(candidate):] + return None + + +def _kind_for_command(canonical: str) -> str: + lowered = canonical.lower() + if any(word in lowered for word in ("lint", "eslint", "ruff")): + return "lint" + if any(word in lowered for word in ("typecheck", "tsc", "mypy", "pyright", "ty")): + return "typecheck" + if "build" in lowered: + return "build" + if "fmt" in lowered or "format" in lowered: + return "format" + if "check" in lowered and "test" not in lowered: + return "check" + return "test" + + +def _looks_like_target(arg: str) -> bool: + if not arg or arg.startswith("-") or "=" in arg: + return False + return ( + "/" in arg + or "\\" in arg + or "::" in arg + or arg.endswith((".py", ".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".java")) + or arg.startswith(("test_", "tests", "spec", "__tests__")) + ) + + +def _scope_for_args(args: list[str]) -> str: + return "targeted" if any(_looks_like_target(arg) for arg in args) else "full" + + +def _summarize_output(output: str) -> str: + text = (output or "").strip() + if len(text) <= _MAX_OUTPUT_SUMMARY_CHARS: + return text + head = _MAX_OUTPUT_SUMMARY_CHARS // 3 + tail = _MAX_OUTPUT_SUMMARY_CHARS - head + return ( + text[:head] + + f"\n... [{len(text) - _MAX_OUTPUT_SUMMARY_CHARS} chars omitted] ...\n" + + text[-tail:] + ) + + +def _prune_old_events(conn: sqlite3.Connection, *, session_id: str, root: str) -> None: + """Bound ledger growth without deleting the current state pointer.""" + cutoff = _retention_cutoff() + conn.execute( + """ + DELETE FROM verification_events + WHERE session_id = ? + AND root = ? + AND id NOT IN ( + SELECT id FROM verification_events + WHERE session_id = ? AND root = ? + ORDER BY id DESC + LIMIT ? + ) + """, + (session_id, root, session_id, root, _MAX_EVENTS_PER_SESSION_ROOT), + ) + conn.execute( + """ + DELETE FROM verification_state + WHERE ( + last_edit_at IS NOT NULL + AND last_edit_at < ? + ) + OR ( + last_edit_at IS NULL + AND last_event_id IN ( + SELECT id FROM verification_events + WHERE created_at < ? + ) + ) + """, + (cutoff, cutoff), + ) + conn.execute( + """ + DELETE FROM verification_events + WHERE created_at < ? + AND id NOT IN ( + SELECT last_event_id FROM verification_state + WHERE last_event_id IS NOT NULL + ) + """, + (cutoff,), + ) + conn.execute( + """ + DELETE FROM verification_events + WHERE id NOT IN ( + SELECT id FROM verification_events + ORDER BY id DESC + LIMIT ? + ) + AND id NOT IN ( + SELECT last_event_id FROM verification_state + WHERE last_event_id IS NOT NULL + ) + """, + (_MAX_TOTAL_UNREFERENCED_EVENTS,), + ) + + +def classify_verification_command( + command: str, + *, + cwd: str | Path | None = None, + session_id: str | None = None, + exit_code: int = 0, + output: str = "", +) -> Optional[VerificationEvidence]: + """Classify a terminal command as verification evidence, if applicable.""" + + if not command or not isinstance(command, str): + return None + try: + from agent.coding_context import project_facts_for + + facts = project_facts_for(cwd) + except Exception: + facts = None + if not facts: + return None + + verify_commands = list(facts.get("verifyCommands") or []) + match = _find_canonical_match(command, verify_commands) + if match is None: + return None + + canonical, trailing_args = match + return VerificationEvidence( + command=command, + canonical_command=canonical, + kind=_kind_for_command(canonical), + scope=_scope_for_args(trailing_args), + status="passed" if int(exit_code) == 0 else "failed", + exit_code=int(exit_code), + cwd=str(Path(cwd or ".").resolve()), + root=str(facts.get("root") or Path(cwd or ".").resolve()), + session_id=str(session_id or "default"), + output_summary=_summarize_output(output), + ) + + +def record_terminal_result( + *, + command: str, + cwd: str | Path | None, + session_id: str | None, + exit_code: int, + output: str = "", +) -> Optional[dict[str, Any]]: + """Record a foreground terminal result when it is verification evidence.""" + + evidence = classify_verification_command( + command, + cwd=cwd, + session_id=session_id, + exit_code=exit_code, + output=output, + ) + if evidence is None: + return None + + created_at = _utc_now() + with _DB_LOCK: + with _connect() as conn: + cur = conn.execute( + """ + INSERT INTO verification_events( + created_at, session_id, cwd, root, command, canonical_command, + kind, scope, status, exit_code, output_summary + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + created_at, + evidence.session_id, + evidence.cwd, + evidence.root, + evidence.command, + evidence.canonical_command, + evidence.kind, + evidence.scope, + evidence.status, + evidence.exit_code, + evidence.output_summary, + ), + ) + if cur.lastrowid is None: + raise RuntimeError("verification event insert did not return an id") + event_id = int(cur.lastrowid) + conn.execute( + """ + INSERT INTO verification_state( + session_id, root, last_event_id, last_edit_at, changed_paths_json + ) VALUES (?, ?, ?, NULL, '[]') + ON CONFLICT(session_id, root) DO UPDATE SET + last_event_id = excluded.last_event_id, + last_edit_at = NULL, + changed_paths_json = '[]' + """, + (evidence.session_id, evidence.root, event_id), + ) + _prune_old_events(conn, session_id=evidence.session_id, root=evidence.root) + conn.commit() + + return {"id": event_id, **evidence.__dict__, "created_at": created_at} + + +def mark_workspace_edited( + *, + session_id: str | None, + cwd: str | Path | None, + paths: list[str] | tuple[str, ...] | None = None, +) -> Optional[dict[str, Any]]: + """Mark verification evidence stale after a successful file edit.""" + + try: + from agent.coding_context import project_facts_for + + facts = project_facts_for(cwd) + except Exception: + facts = None + if not facts: + return None + + sid = str(session_id or "default") + root = str(facts.get("root") or Path(cwd or ".").resolve()) + changed_paths = sorted({str(p) for p in (paths or []) if p}) + edited_at = _utc_now() + + with _DB_LOCK: + with _connect() as conn: + row = conn.execute( + """ + SELECT changed_paths_json FROM verification_state + WHERE session_id = ? AND root = ? + """, + (sid, root), + ).fetchone() + existing: set[str] = set() + if row is not None: + try: + existing = set(json.loads(row["changed_paths_json"] or "[]")) + except (TypeError, ValueError): + existing = set() + merged = sorted((existing | set(changed_paths)))[-200:] + conn.execute( + """ + INSERT INTO verification_state( + session_id, root, last_event_id, last_edit_at, changed_paths_json + ) VALUES (?, ?, NULL, ?, ?) + ON CONFLICT(session_id, root) DO UPDATE SET + last_edit_at = excluded.last_edit_at, + changed_paths_json = excluded.changed_paths_json + """, + (sid, root, edited_at, json.dumps(merged)), + ) + conn.commit() + + return {"session_id": sid, "root": root, "last_edit_at": edited_at, "changed_paths": changed_paths} + + +def verification_status( + *, + session_id: str | None, + cwd: str | Path | None, +) -> dict[str, Any]: + """Return the best known verification state for a session/workspace.""" + + try: + from agent.coding_context import project_facts_for + + facts = project_facts_for(cwd) + except Exception: + facts = None + if not facts: + return {"status": "not_applicable", "evidence": None} + + sid = str(session_id or "default") + root = str(facts.get("root") or Path(cwd or ".").resolve()) + with _DB_LOCK: + with _connect() as conn: + state = conn.execute( + """ + SELECT last_event_id, last_edit_at, changed_paths_json + FROM verification_state + WHERE session_id = ? AND root = ? + """, + (sid, root), + ).fetchone() + if state is None: + return { + "status": "unverified", + "evidence": None, + "root": root, + "session_id": sid, + "changed_paths": [], + } + event = None + if state["last_event_id"] is not None: + event = conn.execute( + "SELECT * FROM verification_events WHERE id = ?", + (state["last_event_id"],), + ).fetchone() + + changed_paths: list[str] = [] + try: + changed_paths = json.loads(state["changed_paths_json"] or "[]") + except (TypeError, ValueError): + changed_paths = [] + + if event is None: + return { + "status": "unverified", + "evidence": None, + "root": root, + "session_id": sid, + "changed_paths": changed_paths, + } + + evidence = dict(event) + if state["last_edit_at"] and state["last_edit_at"] > evidence["created_at"]: + status = "stale" + else: + status = evidence["status"] + return { + "status": status, + "evidence": evidence, + "root": root, + "session_id": sid, + "changed_paths": changed_paths, + } diff --git a/tools/file_tools.py b/tools/file_tools.py index ffae69a6012..3a9c10a520d 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -1266,8 +1266,43 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None: return None +def _mark_verification_stale( + task_id: str, + resolved_paths: list[str], + session_id: str | None = None, +) -> None: + """Best-effort note that successful edits made prior verification stale.""" + paths = [p for p in resolved_paths if p] + if not paths: + return + try: + from agent.coding_context import project_facts_for + from agent.verification_evidence import mark_workspace_edited + + cwd = None + for path in paths: + try: + candidate = str(Path(path).parent) + except Exception: + continue + if project_facts_for(candidate): + cwd = candidate + break + if cwd is None: + cwd = _authoritative_workspace_root(task_id) + if cwd is None: + try: + cwd = str(Path(paths[0]).parent) + except Exception: + cwd = None + mark_workspace_edited(session_id=session_id or task_id, cwd=cwd, paths=paths) + except Exception: + logger.debug("verification stale marker failed", exc_info=True) + + def write_file_tool(path: str, content: str, task_id: str = "default", - cross_profile: bool = False) -> str: + cross_profile: bool = False, + session_id: str | None = None) -> str: """Write content to a file. ``cross_profile`` opts out of the soft cross-Hermes-profile guard. The @@ -1305,6 +1340,8 @@ def write_file_tool(path: str, content: str, task_id: str = "default", result_dict = result.to_dict() if stale_warning: result_dict["_warning"] = stale_warning + if not result_dict.get("error"): + _mark_verification_stale(task_id, [path], session_id=session_id) _update_read_timestamp(path, task_id) return json.dumps(result_dict, ensure_ascii=False) @@ -1331,6 +1368,7 @@ def write_file_tool(path: str, content: str, task_id: str = "default", result_dict["resolved_path"] = _resolved if not result_dict.get("error"): result_dict["files_modified"] = [_resolved] + _mark_verification_stale(task_id, [_resolved], session_id=session_id) # Refresh stamps after the successful write so consecutive # writes by this task don't trigger false staleness warnings. _update_read_timestamp(path, task_id) @@ -1347,7 +1385,8 @@ def write_file_tool(path: str, content: str, task_id: str = "default", def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, patch: str = None, - task_id: str = "default", cross_profile: bool = False) -> str: + task_id: str = "default", cross_profile: bool = False, + session_id: str | None = None) -> str: """Patch a file using replace mode or V4A patch format. ``cross_profile`` opts out of the soft cross-Hermes-profile guard for @@ -1465,6 +1504,7 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, result_dict["files_modified"] = _resolved_modified if len(_resolved_modified) == 1: result_dict["resolved_path"] = _resolved_modified[0] + _mark_verification_stale(task_id, _resolved_modified, session_id=session_id) for _p in _paths_to_check: _update_read_timestamp(_p, task_id) _r = _path_to_resolved.get(_p) @@ -1730,6 +1770,7 @@ def _handle_write_file(args, **kw): return write_file_tool( path=args["path"], content=args["content"], task_id=tid, cross_profile=bool(args.get("cross_profile", False)), + session_id=kw.get("session_id"), ) @@ -1740,6 +1781,7 @@ def _handle_patch(args, **kw): old_string=args.get("old_string"), new_string=args.get("new_string"), replace_all=args.get("replace_all", False), patch=args.get("patch"), task_id=tid, cross_profile=bool(args.get("cross_profile", False)), + session_id=kw.get("session_id"), ) diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index daff82ac34a..28e496f9ce3 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1872,6 +1872,7 @@ def terminal_tool( background: bool = False, timeout: Optional[int] = None, task_id: Optional[str] = None, + session_id: Optional[str] = None, force: bool = False, workdir: Optional[str] = None, pty: bool = False, @@ -1886,6 +1887,7 @@ def terminal_tool( background: Whether to run in background (default: False) timeout: Command timeout in seconds (default: from config) task_id: Unique identifier for environment isolation (optional) + session_id: Conversation/session identifier for durable observability force: If True, skip dangerous command check (use after user confirms) workdir: Working directory for this command (optional, uses session cwd if not set) pty: If True, use pseudo-terminal for interactive CLI tools (local backend only) @@ -2441,16 +2443,18 @@ def terminal_tool( max_retries = 3 retry_count = 0 result = None + command_cwd = None while retry_count <= max_retries: try: + command_cwd = _resolve_command_cwd( + workdir=workdir, + env=env, + default_cwd=cwd, + ) execute_kwargs = { "timeout": effective_timeout, - "cwd": _resolve_command_cwd( - workdir=workdir, - env=env, - default_cwd=cwd, - ), + "cwd": command_cwd, } result = env.execute(command, **execute_kwargs) except Exception as e: @@ -2541,6 +2545,25 @@ def terminal_tool( "exit_code": returncode, "error": None, } + try: + from agent.verification_evidence import record_terminal_result + + evidence = record_terminal_result( + command=command, + cwd=command_cwd, + session_id=session_id or task_id or effective_task_id or "default", + exit_code=returncode, + output=output, + ) + if evidence: + result_dict["verification_evidence"] = { + "status": evidence.get("status"), + "kind": evidence.get("kind"), + "scope": evidence.get("scope"), + "canonical_command": evidence.get("canonical_command"), + } + except Exception: + logger.debug("verification evidence recording failed", exc_info=True) if approval_note: result_dict["approval"] = approval_note if exit_note: @@ -2774,6 +2797,7 @@ def _handle_terminal(args, **kw): background=args.get("background", False), timeout=args.get("timeout"), task_id=kw.get("task_id"), + session_id=kw.get("session_id"), workdir=args.get("workdir"), pty=args.get("pty", False), notify_on_complete=args.get("notify_on_complete", False),