Merge pull request #52285 from NousResearch/bb/verify-ledger

feat(agent): record coding verification evidence
This commit is contained in:
brooklyn! 2026-06-24 23:07:10 -05:00 committed by GitHub
commit da0320bf40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 933 additions and 7 deletions

View file

@ -0,0 +1,547 @@
"""Coding verification evidence ledger.
This module records what the agent actually proved while working in a code
workspace. It is deliberately passive: it never decides to run a suite, never
blocks completion, and never upgrades targeted checks into "repo green".
"""
from __future__ import annotations
import json
import re
import shlex
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from hermes_constants import get_hermes_home
_DB_LOCK = threading.Lock()
_MAX_OUTPUT_SUMMARY_CHARS = 2000
_MAX_EVIDENCE_AGE_DAYS = 30
_MAX_EVENTS_PER_SESSION_ROOT = 100
_MAX_TOTAL_UNREFERENCED_EVENTS = 10_000
_VERIFY_SCHEMA_VERSION = 1
_SHELL_SPLIT_RE = re.compile(r"\s*(?:&&|\|\||;)\s*")
@dataclass(frozen=True)
class VerificationEvidence:
"""A classified command result worth recording."""
command: str
canonical_command: str
kind: str
scope: str
status: str
exit_code: int
cwd: str
root: str
session_id: str
output_summary: str = ""
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def _retention_cutoff() -> str:
return (datetime.now(timezone.utc) - timedelta(days=_MAX_EVIDENCE_AGE_DAYS)).isoformat()
def _db_path() -> Path:
return get_hermes_home() / "verification_evidence.db"
def _connect() -> sqlite3.Connection:
path = _db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.row_factory = sqlite3.Row
_ensure_schema(conn)
return conn
def _ensure_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS verification_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
session_id TEXT NOT NULL,
cwd TEXT NOT NULL,
root TEXT NOT NULL,
command TEXT NOT NULL,
canonical_command TEXT NOT NULL,
kind TEXT NOT NULL,
scope TEXT NOT NULL,
status TEXT NOT NULL,
exit_code INTEGER NOT NULL,
output_summary TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS verification_state (
session_id TEXT NOT NULL,
root TEXT NOT NULL,
last_event_id INTEGER,
last_edit_at TEXT,
changed_paths_json TEXT NOT NULL DEFAULT '[]',
PRIMARY KEY (session_id, root)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_verification_events_session_root
ON verification_events(session_id, root, id DESC)
"""
)
conn.execute(
"INSERT OR REPLACE INTO meta(key, value) VALUES ('schema_version', ?)",
(str(_VERIFY_SCHEMA_VERSION),),
)
conn.commit()
def _split_segment_tokens(command: str) -> list[list[str]]:
segments: list[list[str]] = []
for segment in _SHELL_SPLIT_RE.split(command.strip()):
if not segment:
continue
try:
tokens = shlex.split(segment)
except ValueError:
continue
if tokens:
segments.append(tokens)
return segments
def _clean_token(token: str) -> str:
token = token.strip()
while token.startswith("./"):
token = token[2:]
return token
def _canonical_tokens(canonical: str) -> list[str]:
try:
return [_clean_token(t) for t in shlex.split(canonical) if t]
except ValueError:
return []
def _find_subsequence(tokens: list[str], needle: list[str]) -> Optional[int]:
if not tokens or not needle or len(needle) > len(tokens):
return None
cleaned = [_clean_token(t) for t in tokens]
for idx in range(0, len(cleaned) - len(needle) + 1):
if cleaned[idx:idx + len(needle)] == needle:
return idx
return None
def _strip_command_prefix(tokens: list[str]) -> list[str]:
"""Remove harmless command prefixes before matching canonical commands."""
remaining = list(tokens)
if remaining and remaining[0] == "env":
remaining = remaining[1:]
while remaining and "=" in remaining[0] and not remaining[0].startswith("-"):
remaining = remaining[1:]
while remaining and remaining[0] in {"command", "time", "noglob"}:
remaining = remaining[1:]
return remaining
def _equivalent_needles(needle: list[str]) -> list[list[str]]:
"""Return command spellings equivalent to the detected canonical command."""
candidates = [needle]
if len(needle) >= 3 and needle[1] == "run":
package_manager = needle[0]
script_name = needle[2]
if package_manager in {"npm", "pnpm", "yarn", "bun"}:
candidates.append([package_manager, script_name])
if len(needle) == 1 and "/" in needle[0]:
candidates.extend([["bash", needle[0]], ["sh", needle[0]]])
if needle == ["pytest"]:
candidates.extend(
[
["python", "-m", "pytest"],
["python3", "-m", "pytest"],
["uv", "run", "pytest"],
["poetry", "run", "pytest"],
["pipenv", "run", "pytest"],
]
)
return candidates
def _find_canonical_match(command: str, canonical_commands: list[str]) -> Optional[tuple[str, list[str]]]:
"""Return ``(canonical, trailing_args)`` for the first detected command."""
segments = _split_segment_tokens(command)
for canonical in canonical_commands:
needle = _canonical_tokens(canonical)
if not needle:
continue
for tokens in segments:
candidate_tokens = _strip_command_prefix(tokens)
for candidate in _equivalent_needles(needle):
if candidate_tokens[:len(candidate)] == candidate:
return canonical, candidate_tokens[len(candidate):]
return None
def _kind_for_command(canonical: str) -> str:
lowered = canonical.lower()
if any(word in lowered for word in ("lint", "eslint", "ruff")):
return "lint"
if any(word in lowered for word in ("typecheck", "tsc", "mypy", "pyright", "ty")):
return "typecheck"
if "build" in lowered:
return "build"
if "fmt" in lowered or "format" in lowered:
return "format"
if "check" in lowered and "test" not in lowered:
return "check"
return "test"
def _looks_like_target(arg: str) -> bool:
if not arg or arg.startswith("-") or "=" in arg:
return False
return (
"/" in arg
or "\\" in arg
or "::" in arg
or arg.endswith((".py", ".js", ".jsx", ".ts", ".tsx", ".rs", ".go", ".java"))
or arg.startswith(("test_", "tests", "spec", "__tests__"))
)
def _scope_for_args(args: list[str]) -> str:
return "targeted" if any(_looks_like_target(arg) for arg in args) else "full"
def _summarize_output(output: str) -> str:
text = (output or "").strip()
if len(text) <= _MAX_OUTPUT_SUMMARY_CHARS:
return text
head = _MAX_OUTPUT_SUMMARY_CHARS // 3
tail = _MAX_OUTPUT_SUMMARY_CHARS - head
return (
text[:head]
+ f"\n... [{len(text) - _MAX_OUTPUT_SUMMARY_CHARS} chars omitted] ...\n"
+ text[-tail:]
)
def _prune_old_events(conn: sqlite3.Connection, *, session_id: str, root: str) -> None:
"""Bound ledger growth without deleting the current state pointer."""
cutoff = _retention_cutoff()
conn.execute(
"""
DELETE FROM verification_events
WHERE session_id = ?
AND root = ?
AND id NOT IN (
SELECT id FROM verification_events
WHERE session_id = ? AND root = ?
ORDER BY id DESC
LIMIT ?
)
""",
(session_id, root, session_id, root, _MAX_EVENTS_PER_SESSION_ROOT),
)
conn.execute(
"""
DELETE FROM verification_state
WHERE (
last_edit_at IS NOT NULL
AND last_edit_at < ?
)
OR (
last_edit_at IS NULL
AND last_event_id IN (
SELECT id FROM verification_events
WHERE created_at < ?
)
)
""",
(cutoff, cutoff),
)
conn.execute(
"""
DELETE FROM verification_events
WHERE created_at < ?
AND id NOT IN (
SELECT last_event_id FROM verification_state
WHERE last_event_id IS NOT NULL
)
""",
(cutoff,),
)
conn.execute(
"""
DELETE FROM verification_events
WHERE id NOT IN (
SELECT id FROM verification_events
ORDER BY id DESC
LIMIT ?
)
AND id NOT IN (
SELECT last_event_id FROM verification_state
WHERE last_event_id IS NOT NULL
)
""",
(_MAX_TOTAL_UNREFERENCED_EVENTS,),
)
def classify_verification_command(
command: str,
*,
cwd: str | Path | None = None,
session_id: str | None = None,
exit_code: int = 0,
output: str = "",
) -> Optional[VerificationEvidence]:
"""Classify a terminal command as verification evidence, if applicable."""
if not command or not isinstance(command, str):
return None
try:
from agent.coding_context import project_facts_for
facts = project_facts_for(cwd)
except Exception:
facts = None
if not facts:
return None
verify_commands = list(facts.get("verifyCommands") or [])
match = _find_canonical_match(command, verify_commands)
if match is None:
return None
canonical, trailing_args = match
return VerificationEvidence(
command=command,
canonical_command=canonical,
kind=_kind_for_command(canonical),
scope=_scope_for_args(trailing_args),
status="passed" if int(exit_code) == 0 else "failed",
exit_code=int(exit_code),
cwd=str(Path(cwd or ".").resolve()),
root=str(facts.get("root") or Path(cwd or ".").resolve()),
session_id=str(session_id or "default"),
output_summary=_summarize_output(output),
)
def record_terminal_result(
*,
command: str,
cwd: str | Path | None,
session_id: str | None,
exit_code: int,
output: str = "",
) -> Optional[dict[str, Any]]:
"""Record a foreground terminal result when it is verification evidence."""
evidence = classify_verification_command(
command,
cwd=cwd,
session_id=session_id,
exit_code=exit_code,
output=output,
)
if evidence is None:
return None
created_at = _utc_now()
with _DB_LOCK:
with _connect() as conn:
cur = conn.execute(
"""
INSERT INTO verification_events(
created_at, session_id, cwd, root, command, canonical_command,
kind, scope, status, exit_code, output_summary
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
created_at,
evidence.session_id,
evidence.cwd,
evidence.root,
evidence.command,
evidence.canonical_command,
evidence.kind,
evidence.scope,
evidence.status,
evidence.exit_code,
evidence.output_summary,
),
)
if cur.lastrowid is None:
raise RuntimeError("verification event insert did not return an id")
event_id = int(cur.lastrowid)
conn.execute(
"""
INSERT INTO verification_state(
session_id, root, last_event_id, last_edit_at, changed_paths_json
) VALUES (?, ?, ?, NULL, '[]')
ON CONFLICT(session_id, root) DO UPDATE SET
last_event_id = excluded.last_event_id,
last_edit_at = NULL,
changed_paths_json = '[]'
""",
(evidence.session_id, evidence.root, event_id),
)
_prune_old_events(conn, session_id=evidence.session_id, root=evidence.root)
conn.commit()
return {"id": event_id, **evidence.__dict__, "created_at": created_at}
def mark_workspace_edited(
*,
session_id: str | None,
cwd: str | Path | None,
paths: list[str] | tuple[str, ...] | None = None,
) -> Optional[dict[str, Any]]:
"""Mark verification evidence stale after a successful file edit."""
try:
from agent.coding_context import project_facts_for
facts = project_facts_for(cwd)
except Exception:
facts = None
if not facts:
return None
sid = str(session_id or "default")
root = str(facts.get("root") or Path(cwd or ".").resolve())
changed_paths = sorted({str(p) for p in (paths or []) if p})
edited_at = _utc_now()
with _DB_LOCK:
with _connect() as conn:
row = conn.execute(
"""
SELECT changed_paths_json FROM verification_state
WHERE session_id = ? AND root = ?
""",
(sid, root),
).fetchone()
existing: set[str] = set()
if row is not None:
try:
existing = set(json.loads(row["changed_paths_json"] or "[]"))
except (TypeError, ValueError):
existing = set()
merged = sorted((existing | set(changed_paths)))[-200:]
conn.execute(
"""
INSERT INTO verification_state(
session_id, root, last_event_id, last_edit_at, changed_paths_json
) VALUES (?, ?, NULL, ?, ?)
ON CONFLICT(session_id, root) DO UPDATE SET
last_edit_at = excluded.last_edit_at,
changed_paths_json = excluded.changed_paths_json
""",
(sid, root, edited_at, json.dumps(merged)),
)
conn.commit()
return {"session_id": sid, "root": root, "last_edit_at": edited_at, "changed_paths": changed_paths}
def verification_status(
*,
session_id: str | None,
cwd: str | Path | None,
) -> dict[str, Any]:
"""Return the best known verification state for a session/workspace."""
try:
from agent.coding_context import project_facts_for
facts = project_facts_for(cwd)
except Exception:
facts = None
if not facts:
return {"status": "not_applicable", "evidence": None}
sid = str(session_id or "default")
root = str(facts.get("root") or Path(cwd or ".").resolve())
with _DB_LOCK:
with _connect() as conn:
state = conn.execute(
"""
SELECT last_event_id, last_edit_at, changed_paths_json
FROM verification_state
WHERE session_id = ? AND root = ?
""",
(sid, root),
).fetchone()
if state is None:
return {
"status": "unverified",
"evidence": None,
"root": root,
"session_id": sid,
"changed_paths": [],
}
event = None
if state["last_event_id"] is not None:
event = conn.execute(
"SELECT * FROM verification_events WHERE id = ?",
(state["last_event_id"],),
).fetchone()
changed_paths: list[str] = []
try:
changed_paths = json.loads(state["changed_paths_json"] or "[]")
except (TypeError, ValueError):
changed_paths = []
if event is None:
return {
"status": "unverified",
"evidence": None,
"root": root,
"session_id": sid,
"changed_paths": changed_paths,
}
evidence = dict(event)
if state["last_edit_at"] and state["last_edit_at"] > evidence["created_at"]:
status = "stale"
else:
status = evidence["status"]
return {
"status": status,
"evidence": evidence,
"root": root,
"session_id": sid,
"changed_paths": changed_paths,
}

View file

@ -0,0 +1,313 @@
import json
import sqlite3
from datetime import datetime, timedelta, timezone
from pathlib import Path
from agent.verification_evidence import (
classify_verification_command,
mark_workspace_edited,
record_terminal_result,
verification_status,
)
def _node_project(root: Path) -> None:
(root / "package.json").write_text(
json.dumps({"scripts": {"test": "vitest", "lint": "eslint .", "dev": "vite"}})
)
(root / "pnpm-lock.yaml").write_text("")
scripts = root / "scripts"
scripts.mkdir()
(scripts / "run_tests.sh").write_text("#!/bin/sh\n")
def _python_project(root: Path) -> None:
(root / "pyproject.toml").write_text("[tool.pytest.ini_options]\n")
def test_classifies_targeted_project_verify_command(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
evidence = classify_verification_command(
"scripts/run_tests.sh tests/test_widget.py -q",
cwd=tmp_path,
session_id="s1",
exit_code=0,
output="1 passed",
)
assert evidence is not None
assert evidence.canonical_command == "scripts/run_tests.sh"
assert evidence.kind == "test"
assert evidence.scope == "targeted"
assert evidence.status == "passed"
def test_classifies_python_module_pytest_as_detected_pytest(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_python_project(tmp_path)
evidence = classify_verification_command(
"python -m pytest tests/test_calc.py::test_even -q",
cwd=tmp_path,
session_id="s1",
exit_code=1,
output="failed",
)
assert evidence is not None
assert evidence.canonical_command == "pytest"
assert evidence.kind == "test"
assert evidence.scope == "targeted"
assert evidence.status == "failed"
def test_records_passed_then_marks_stale_after_edit(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
event = record_terminal_result(
command="scripts/run_tests.sh",
cwd=tmp_path,
session_id="s1",
exit_code=0,
output="all green",
)
assert event is not None
assert verification_status(session_id="s1", cwd=tmp_path)["status"] == "passed"
mark_workspace_edited(
session_id="s1",
cwd=tmp_path,
paths=[str(tmp_path / "src" / "app.ts")],
)
status = verification_status(session_id="s1", cwd=tmp_path)
assert status["status"] == "stale"
assert status["changed_paths"] == [str(tmp_path / "src" / "app.ts")]
def test_lint_and_typecheck_are_not_reported_as_full_tests(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
lint = classify_verification_command(
"pnpm run lint",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
test = classify_verification_command(
"pnpm run test -- tests/button.test.tsx",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
assert lint is not None
assert lint.kind == "lint"
assert lint.scope == "full"
assert test is not None
assert test.kind == "test"
assert test.scope == "targeted"
def test_package_script_shorthand_matches_canonical_verify_command(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
evidence = classify_verification_command(
"pnpm test -- tests/button.test.tsx",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
assert evidence is not None
assert evidence.canonical_command == "pnpm run test"
assert evidence.scope == "targeted"
def test_shell_wrappers_match_but_echo_does_not(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
wrapped = classify_verification_command(
"env CI=1 bash scripts/run_tests.sh tests/test_widget.py",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
echoed = classify_verification_command(
"echo scripts/run_tests.sh tests/test_widget.py",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
assert wrapped is not None
assert wrapped.canonical_command == "scripts/run_tests.sh"
assert wrapped.scope == "targeted"
assert echoed is None
def test_uv_run_pytest_matches_detected_pytest(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_python_project(tmp_path)
evidence = classify_verification_command(
"uv run pytest tests/test_calc.py",
cwd=tmp_path,
session_id="s1",
exit_code=0,
)
assert evidence is not None
assert evidence.canonical_command == "pytest"
assert evidence.scope == "targeted"
def test_status_is_unverified_without_evidence(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
assert verification_status(session_id="s1", cwd=tmp_path)["status"] == "unverified"
def test_edit_without_prior_evidence_stays_unverified(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
mark_workspace_edited(
session_id="s1",
cwd=tmp_path,
paths=[str(tmp_path / "src" / "app.ts")],
)
status = verification_status(session_id="s1", cwd=tmp_path)
assert status["status"] == "unverified"
assert status["changed_paths"] == [str(tmp_path / "src" / "app.ts")]
def test_file_tool_stales_evidence_by_session_id_for_absolute_edit(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
_node_project(tmp_path)
target = tmp_path / "src" / "app.ts"
target.parent.mkdir()
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="conversation",
exit_code=0,
output="green",
)
from tools.file_tools import write_file_tool
result = json.loads(
write_file_tool(
str(target),
"export const ok = true\n",
task_id="turn",
session_id="conversation",
)
)
assert result["files_modified"] == [str(target.resolve())]
assert verification_status(session_id="conversation", cwd=tmp_path)["status"] == "stale"
assert verification_status(session_id="turn", cwd=tmp_path)["status"] == "unverified"
def test_recording_prunes_old_events_but_keeps_latest_state(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(home))
_node_project(tmp_path)
for index in range(120):
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="s1",
exit_code=0,
output=f"green {index}",
)
with sqlite3.connect(home / "verification_evidence.db") as conn:
event_count = conn.execute("SELECT COUNT(*) FROM verification_events").fetchone()[0]
latest_summary = conn.execute(
"""
SELECT output_summary
FROM verification_events
ORDER BY id DESC
LIMIT 1
"""
).fetchone()[0]
assert event_count == 100
assert latest_summary == "green 119"
assert verification_status(session_id="s1", cwd=tmp_path)["status"] == "passed"
def test_recording_expires_old_current_evidence(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(home))
_node_project(tmp_path)
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="old-session",
exit_code=0,
output="old green",
)
cutoff = (datetime.now(timezone.utc) - timedelta(days=31)).isoformat()
with sqlite3.connect(home / "verification_evidence.db") as conn:
conn.execute("UPDATE verification_events SET created_at = ?", (cutoff,))
conn.commit()
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="new-session",
exit_code=0,
output="new green",
)
assert verification_status(session_id="old-session", cwd=tmp_path)["status"] == "unverified"
assert verification_status(session_id="new-session", cwd=tmp_path)["status"] == "passed"
with sqlite3.connect(home / "verification_evidence.db") as conn:
old_rows = conn.execute(
"SELECT COUNT(*) FROM verification_events WHERE session_id = 'old-session'"
).fetchone()[0]
assert old_rows == 0
def test_recording_expires_old_edit_only_state(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(home))
_node_project(tmp_path)
mark_workspace_edited(
session_id="old-session",
cwd=tmp_path,
paths=[str(tmp_path / "src" / "app.ts")],
)
cutoff = (datetime.now(timezone.utc) - timedelta(days=31)).isoformat()
with sqlite3.connect(home / "verification_evidence.db") as conn:
conn.execute("UPDATE verification_state SET last_edit_at = ?", (cutoff,))
conn.commit()
record_terminal_result(
command="pnpm test",
cwd=tmp_path,
session_id="new-session",
exit_code=0,
output="new green",
)
status = verification_status(session_id="old-session", cwd=tmp_path)
assert status["status"] == "unverified"
assert status["changed_paths"] == []

View file

@ -1266,8 +1266,43 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None:
return None
def _mark_verification_stale(
task_id: str,
resolved_paths: list[str],
session_id: str | None = None,
) -> None:
"""Best-effort note that successful edits made prior verification stale."""
paths = [p for p in resolved_paths if p]
if not paths:
return
try:
from agent.coding_context import project_facts_for
from agent.verification_evidence import mark_workspace_edited
cwd = None
for path in paths:
try:
candidate = str(Path(path).parent)
except Exception:
continue
if project_facts_for(candidate):
cwd = candidate
break
if cwd is None:
cwd = _authoritative_workspace_root(task_id)
if cwd is None:
try:
cwd = str(Path(paths[0]).parent)
except Exception:
cwd = None
mark_workspace_edited(session_id=session_id or task_id, cwd=cwd, paths=paths)
except Exception:
logger.debug("verification stale marker failed", exc_info=True)
def write_file_tool(path: str, content: str, task_id: str = "default",
cross_profile: bool = False) -> str:
cross_profile: bool = False,
session_id: str | None = None) -> str:
"""Write content to a file.
``cross_profile`` opts out of the soft cross-Hermes-profile guard. The
@ -1305,6 +1340,8 @@ def write_file_tool(path: str, content: str, task_id: str = "default",
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
if not result_dict.get("error"):
_mark_verification_stale(task_id, [path], session_id=session_id)
_update_read_timestamp(path, task_id)
return json.dumps(result_dict, ensure_ascii=False)
@ -1331,6 +1368,7 @@ def write_file_tool(path: str, content: str, task_id: str = "default",
result_dict["resolved_path"] = _resolved
if not result_dict.get("error"):
result_dict["files_modified"] = [_resolved]
_mark_verification_stale(task_id, [_resolved], session_id=session_id)
# Refresh stamps after the successful write so consecutive
# writes by this task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
@ -1347,7 +1385,8 @@ def write_file_tool(path: str, content: str, task_id: str = "default",
def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
new_string: str = None, replace_all: bool = False, patch: str = None,
task_id: str = "default", cross_profile: bool = False) -> str:
task_id: str = "default", cross_profile: bool = False,
session_id: str | None = None) -> str:
"""Patch a file using replace mode or V4A patch format.
``cross_profile`` opts out of the soft cross-Hermes-profile guard for
@ -1465,6 +1504,7 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_dict["files_modified"] = _resolved_modified
if len(_resolved_modified) == 1:
result_dict["resolved_path"] = _resolved_modified[0]
_mark_verification_stale(task_id, _resolved_modified, session_id=session_id)
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
_r = _path_to_resolved.get(_p)
@ -1730,6 +1770,7 @@ def _handle_write_file(args, **kw):
return write_file_tool(
path=args["path"], content=args["content"], task_id=tid,
cross_profile=bool(args.get("cross_profile", False)),
session_id=kw.get("session_id"),
)
@ -1740,6 +1781,7 @@ def _handle_patch(args, **kw):
old_string=args.get("old_string"), new_string=args.get("new_string"),
replace_all=args.get("replace_all", False), patch=args.get("patch"), task_id=tid,
cross_profile=bool(args.get("cross_profile", False)),
session_id=kw.get("session_id"),
)

View file

@ -1872,6 +1872,7 @@ def terminal_tool(
background: bool = False,
timeout: Optional[int] = None,
task_id: Optional[str] = None,
session_id: Optional[str] = None,
force: bool = False,
workdir: Optional[str] = None,
pty: bool = False,
@ -1886,6 +1887,7 @@ def terminal_tool(
background: Whether to run in background (default: False)
timeout: Command timeout in seconds (default: from config)
task_id: Unique identifier for environment isolation (optional)
session_id: Conversation/session identifier for durable observability
force: If True, skip dangerous command check (use after user confirms)
workdir: Working directory for this command (optional, uses session cwd if not set)
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
@ -2441,16 +2443,18 @@ def terminal_tool(
max_retries = 3
retry_count = 0
result = None
command_cwd = None
while retry_count <= max_retries:
try:
command_cwd = _resolve_command_cwd(
workdir=workdir,
env=env,
default_cwd=cwd,
)
execute_kwargs = {
"timeout": effective_timeout,
"cwd": _resolve_command_cwd(
workdir=workdir,
env=env,
default_cwd=cwd,
),
"cwd": command_cwd,
}
result = env.execute(command, **execute_kwargs)
except Exception as e:
@ -2541,6 +2545,25 @@ def terminal_tool(
"exit_code": returncode,
"error": None,
}
try:
from agent.verification_evidence import record_terminal_result
evidence = record_terminal_result(
command=command,
cwd=command_cwd,
session_id=session_id or task_id or effective_task_id or "default",
exit_code=returncode,
output=output,
)
if evidence:
result_dict["verification_evidence"] = {
"status": evidence.get("status"),
"kind": evidence.get("kind"),
"scope": evidence.get("scope"),
"canonical_command": evidence.get("canonical_command"),
}
except Exception:
logger.debug("verification evidence recording failed", exc_info=True)
if approval_note:
result_dict["approval"] = approval_note
if exit_note:
@ -2774,6 +2797,7 @@ def _handle_terminal(args, **kw):
background=args.get("background", False),
timeout=args.get("timeout"),
task_id=kw.get("task_id"),
session_id=kw.get("session_id"),
workdir=args.get("workdir"),
pty=args.get("pty", False),
notify_on_complete=args.get("notify_on_complete", False),