hermes-agent/hermes_cli/goals.py
Teknium a63a2b7c78
fix(goals): force judge to use tool calls instead of JSON-text replies (#23547)
Live-tested on gemini-3-flash-preview the judge kept returning empty
or non-JSON content, tripping the consecutive-parse-failures auto-
pause. Free-form JSON output is hopeful; tool-call schemas are
enforced server-side by virtually every modern provider.

Two new tools the judge calls:

  - submit_checklist(items)  — Phase A, decompose
  - update_checklist(updates, new_items, reason) — Phase B, evaluate

Both phases now call the auxiliary client with tool_choice forcing
the right tool. read_file remains for Phase B history inspection,
with the loop exiting only when update_checklist is called or the
read budget is exhausted (at which point read_file is dropped from
the toolbox and update_checklist is forced).

Robustness:
- _call_judge_with_tool_choice falls back tool_choice forced→required→
  auto if the provider rejects a particular shape.
- If a fully-broken provider still returns content instead of a tool
  call, the legacy JSON-text parsers stay around as a last-ditch
  backstop so we never silently lose a checklist.
- _normalize_update_args replaces the JSON parser for the apply
  layer; same 1-based→0-based conversion + terminal-status filter.

Live verification: same fizzbuzz goal that was hitting 'judge model
returned unparseable output 3 turns in a row' before now terminates
in 2 turns, all 11 items marked completed with item-specific
evidence, no auto-pause. Agent log shows
'produced 11 checklist items via tool call' instead of the JSON-
parse path.

Tests: 7 new cases for the tool-call path (Phase A success, Phase B
update only, Phase B read_file→update, JSON-content backstop,
empty-text item dropping, non-terminal status filter).
2026-05-10 20:51:40 -07:00

1871 lines
75 KiB
Python

"""Persistent session goals — the Ralph loop for Hermes.
A goal is a free-form user objective that stays active across turns. After
each turn completes, a small judge call asks an auxiliary model "is this
goal satisfied by the assistant's last response?". If not, Hermes feeds a
continuation prompt back into the same session and keeps working until the
goal is done, turn budget is exhausted, the user pauses/clears it, or the
user sends a new message (which takes priority and pauses the goal loop).
Checklist mode (added 2026-05): when a goal is set, a Phase-A "decompose"
call asks the judge to write an extremely detailed checklist of concrete
completion criteria for that goal. On every subsequent turn (Phase B) the
judge evaluates the agent's most recent output against EACH pending item
and may flip pending → completed | impossible, or append new items it
discovers along the way. The goal is done only when every checklist item
is in a terminal status. This is much harsher than the freeform
"is the goal done?" prompt and gives users a visible, verifiable progress
surface via /subgoal. A bounded read_file tool loop lets the judge inspect
the dumped conversation history when the snippet alone isn't enough to
rule.
State is persisted in SessionDB's ``state_meta`` table keyed by
``goal:<session_id>`` so ``/resume`` picks it up.
Design notes / invariants:
- The continuation prompt is just a normal user message appended to the
session via ``run_conversation``. No system-prompt mutation, no toolset
swap — prompt caching stays intact.
- Judge failures are fail-OPEN: ``continue``. A broken judge must not wedge
progress; the turn budget is the backstop.
- When a real user message arrives mid-loop it preempts the continuation
prompt and also pauses the goal loop for that turn (we still re-judge
after, so if the user's message happens to complete the goal the judge
will say ``done``).
- Stickiness: once an item is marked completed or impossible, only the user
(via /subgoal undo) can flip it back. Judge updates that try to regress
terminal items are silently ignored.
- This module has zero hard dependency on ``cli.HermesCLI`` or the gateway
runner — both wire the same ``GoalManager`` in.
Nothing in this module touches the agent's system prompt or toolset.
"""
from __future__ import annotations
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────
# Constants & defaults
# ──────────────────────────────────────────────────────────────────────
DEFAULT_MAX_TURNS = 20
DEFAULT_JUDGE_TIMEOUT = 60.0
# Cap how much of the last response we send to the judge inline. The judge
# can read the dumped conversation file via read_file if it needs more.
_JUDGE_RESPONSE_SNIPPET_CHARS = 4000
# After this many consecutive judge *parse* failures (empty output / non-JSON),
# the loop auto-pauses and points the user at the goal_judge config. API /
# transport errors do NOT count toward this — those are transient. This guards
# against small models (e.g. deepseek-v4-flash) that cannot follow the strict
# JSON reply contract; without it the loop runs until the turn budget is
# exhausted with every reply shaped like `judge returned empty response` or
# `judge reply was not JSON`.
DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES = 3
# Bound the Phase-B judge tool loop: if the judge keeps calling read_file
# without ever emitting a verdict, cap it so we don't burn the model's budget.
DEFAULT_MAX_JUDGE_TOOL_CALLS = 5
# Cap a single read_file response so a judge that tries to read 100k lines
# doesn't blow up its own context. Judge can paginate if needed.
_JUDGE_READ_FILE_MAX_LINES = 400
_JUDGE_READ_FILE_MAX_CHARS = 32_000
# Status constants ────────────────────────────────────────────────────
ITEM_PENDING = "pending"
ITEM_COMPLETED = "completed"
ITEM_IMPOSSIBLE = "impossible"
TERMINAL_ITEM_STATUSES = frozenset({ITEM_COMPLETED, ITEM_IMPOSSIBLE})
VALID_ITEM_STATUSES = frozenset({ITEM_PENDING, ITEM_COMPLETED, ITEM_IMPOSSIBLE})
ITEM_MARKERS = {
ITEM_COMPLETED: "[x]",
ITEM_IMPOSSIBLE: "[!]",
ITEM_PENDING: "[ ]",
}
ADDED_BY_JUDGE = "judge"
ADDED_BY_USER = "user"
# ──────────────────────────────────────────────────────────────────────
# Continuation prompt
# ──────────────────────────────────────────────────────────────────────
CONTINUATION_PROMPT_TEMPLATE = (
"[Continuing toward your standing goal]\n"
"Goal: {goal}\n\n"
"Continue working toward this goal. Take the next concrete step. "
"If you believe the goal is complete, state so explicitly and stop. "
"If you are blocked and need input from the user, say so clearly and stop."
)
CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE = (
"[Continuing toward your standing goal]\n"
"Goal: {goal}\n\n"
"Checklist progress ({done}/{total} done):\n"
"{checklist}\n\n"
"Work on the unchecked items above. Do not declare items done yourself "
"— a judge marks them based on evidence in your output. If an item is "
"genuinely impossible in this environment, explain why so the judge can "
"mark it impossible. If you are blocked on a remaining item and need "
"user input, say so clearly and stop."
)
# ──────────────────────────────────────────────────────────────────────
# Phase-A: decompose prompts
# ──────────────────────────────────────────────────────────────────────
DECOMPOSE_SYSTEM_PROMPT = (
"You are a strict judge for an autonomous agent. Your first job, before "
"judging anything, is to break the user's stated goal into an EXTREMELY "
"DETAILED checklist of concrete, verifiable completion criteria. Each "
"item must be specific enough that a third party reading the agent's "
"output could decide unambiguously whether that item was achieved.\n\n"
"Be exhaustive. Bias toward MORE items, not fewer. Include sub-items, "
"edge cases, quality bars, deployment steps, verification checks, and "
"anything the user would reasonably expect from a goal of this type. "
"If the user said 'build me a website' you should be enumerating "
"homepage exists, navigation links work, content is non-placeholder, "
"mobile responsive, accessibility tags present, deployed somewhere "
"publicly accessible, domain/URL is functional, etc. Better to "
"over-specify and let a few items get marked impossible than to "
"under-specify and let the agent declare victory early.\n\n"
"Submit your checklist by calling the ``submit_checklist`` tool. Do "
"not reply with prose or JSON in your message body — call the tool. "
"The system will not see anything you write outside the tool call."
)
DECOMPOSE_USER_PROMPT_TEMPLATE = (
"Goal:\n{goal}\n\n"
"Produce the harshest, most detailed checklist of completion criteria "
"you can. Aim for at least 5 items; more is better when warranted. "
"Each item should be a single verifiable statement of fact about the "
"finished work."
)
# ──────────────────────────────────────────────────────────────────────
# Phase-B: evaluate prompts
# ──────────────────────────────────────────────────────────────────────
EVALUATE_SYSTEM_PROMPT_FREEFORM = (
"You are a strict judge evaluating whether an autonomous agent has "
"achieved a user's stated goal. You receive the goal text and the "
"agent's most recent response. Your only job is to decide whether "
"the goal is fully satisfied based on that response.\n\n"
"A goal is DONE only when:\n"
"- The response explicitly confirms the goal was completed, OR\n"
"- The response clearly shows the final deliverable was produced, OR\n"
"- The response explains the goal is unachievable / blocked / needs "
"user input (treat this as DONE with reason describing the block).\n\n"
"Otherwise the goal is NOT done — CONTINUE.\n\n"
"Reply ONLY with a single JSON object on one line:\n"
'{"done": <true|false>, "reason": "<one-sentence rationale>"}'
)
EVALUATE_SYSTEM_PROMPT_CHECKLIST = (
"You are a strict judge evaluating an autonomous agent's progress on "
"a user's goal that has a detailed checklist of completion criteria. "
"For EACH currently-pending checklist item, decide whether the "
"available evidence shows the item is satisfied.\n\n"
"Be strict but not absurd. Default to leaving items pending UNLESS "
"evidence is reasonably clear. Reasonable evidence includes:\n"
"- The agent's most recent response describing or showing the work\n"
"- Tool call results visible in the conversation history (file writes, "
"command output, web requests, etc.)\n"
"- A clear statement by the agent that the work was done, when "
"supported by tool output earlier in the conversation\n\n"
"Do NOT require the agent to re-prove items it has already established "
"in earlier turns. If a tool call earlier in the conversation already "
"wrote a file, you do not need fresh `ls` output every turn — once "
"established, it's done.\n\n"
"Flip pending → completed when the response or recent tool calls show "
"the item is satisfied. Flip pending → impossible only when the work "
"demonstrates the item cannot be achieved in this environment (NOT "
"merely that the agent didn't try). Vague intentions ('I will do X "
"next') do NOT count as completion.\n\n"
"STICKINESS: items already marked completed or impossible are frozen. "
"Do not include them in your updates. Only the user can revert them.\n\n"
"TOOLS:\n"
"- ``read_file(path, offset, limit)``: inspect the dumped conversation "
"history file whose path is given in the user message. Use this when "
"the snippet alone isn't enough to rule. Each call costs tokens, so "
"only read when needed.\n"
"- ``update_checklist(updates, new_items, reason)``: issue your "
"verdict. Call this exactly once per turn when you are ready to rule. "
"Calling it ENDS the evaluation.\n\n"
"You MUST call one of these tools every turn. Do not reply with "
"prose or JSON in your message body — the system will not see "
"anything written outside tool calls. When you cite evidence, "
"reference the agent's actual output specifically."
)
EVALUATE_USER_PROMPT_CHECKLIST_TEMPLATE = (
"Goal:\n{goal}\n\n"
"Current checklist (each item is numbered, 1-based — use these "
"exact 1-based numbers as the ``index`` field in your updates):\n{checklist_block}\n\n"
"Agent's most recent response (snippet):\n{response}\n\n"
"Conversation history file (call read_file on this path if you need "
"more context — pagination supported via offset/limit):\n{history_path}\n\n"
"Evaluate each pending item. Cite specific evidence."
)
EVALUATE_USER_PROMPT_FREEFORM_TEMPLATE = (
"Goal:\n{goal}\n\n"
"Agent's most recent response:\n{response}\n\n"
"Is the goal satisfied?"
)
# ──────────────────────────────────────────────────────────────────────
# Dataclasses
# ──────────────────────────────────────────────────────────────────────
@dataclass
class ChecklistItem:
"""One concrete completion criterion attached to a goal."""
text: str
status: str = ITEM_PENDING # pending | completed | impossible
added_by: str = ADDED_BY_JUDGE # judge | user
added_at: float = 0.0
completed_at: Optional[float] = None
evidence: Optional[str] = None # judge's rationale on flip
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ChecklistItem":
text = str(data.get("text", "")).strip()
if not text:
text = "(empty item)"
status = str(data.get("status", ITEM_PENDING)).strip().lower()
if status not in VALID_ITEM_STATUSES:
status = ITEM_PENDING
added_by = str(data.get("added_by", ADDED_BY_JUDGE)).strip().lower()
if added_by not in (ADDED_BY_JUDGE, ADDED_BY_USER):
added_by = ADDED_BY_JUDGE
return cls(
text=text,
status=status,
added_by=added_by,
added_at=float(data.get("added_at", 0.0) or 0.0),
completed_at=(
float(data["completed_at"])
if data.get("completed_at") is not None
else None
),
evidence=data.get("evidence"),
)
@dataclass
class GoalState:
"""Serializable goal state stored per session."""
goal: str
status: str = "active" # active | paused | done | cleared
turns_used: int = 0
max_turns: int = DEFAULT_MAX_TURNS
created_at: float = 0.0
last_turn_at: float = 0.0
last_verdict: Optional[str] = None # "done" | "continue" | "skipped"
last_reason: Optional[str] = None
paused_reason: Optional[str] = None # why we auto-paused (budget, etc.)
consecutive_parse_failures: int = 0 # judge-output parse failures in a row
# Checklist mode (added 2026-05). Both fields default safely so old
# state_meta rows load unchanged.
checklist: List[ChecklistItem] = field(default_factory=list)
decomposed: bool = False # has Phase-A run for this goal?
def to_json(self) -> str:
data = asdict(self)
# asdict already serializes ChecklistItem via dataclass recursion.
return json.dumps(data, ensure_ascii=False)
@classmethod
def from_json(cls, raw: str) -> "GoalState":
data = json.loads(raw)
raw_checklist = data.get("checklist") or []
checklist: List[ChecklistItem] = []
if isinstance(raw_checklist, list):
for item in raw_checklist:
if isinstance(item, dict):
try:
checklist.append(ChecklistItem.from_dict(item))
except Exception:
continue
return cls(
goal=data.get("goal", ""),
status=data.get("status", "active"),
turns_used=int(data.get("turns_used", 0) or 0),
max_turns=int(data.get("max_turns", DEFAULT_MAX_TURNS) or DEFAULT_MAX_TURNS),
created_at=float(data.get("created_at", 0.0) or 0.0),
last_turn_at=float(data.get("last_turn_at", 0.0) or 0.0),
last_verdict=data.get("last_verdict"),
last_reason=data.get("last_reason"),
paused_reason=data.get("paused_reason"),
consecutive_parse_failures=int(data.get("consecutive_parse_failures", 0) or 0),
checklist=checklist,
decomposed=bool(data.get("decomposed", False)),
)
# --- checklist helpers ------------------------------------------------
def checklist_counts(self) -> Tuple[int, int, int, int]:
"""Return (total, completed, impossible, pending)."""
total = len(self.checklist)
completed = sum(1 for it in self.checklist if it.status == ITEM_COMPLETED)
impossible = sum(1 for it in self.checklist if it.status == ITEM_IMPOSSIBLE)
pending = total - completed - impossible
return total, completed, impossible, pending
def all_terminal(self) -> bool:
"""True iff at least one item exists and every item is in a terminal status."""
if not self.checklist:
return False
return all(it.status in TERMINAL_ITEM_STATUSES for it in self.checklist)
def render_checklist(self, *, numbered: bool = False) -> str:
if not self.checklist:
return "(empty)"
lines = []
for i, item in enumerate(self.checklist, start=1):
marker = ITEM_MARKERS.get(item.status, "[?]")
prefix = f"{i}. {marker}" if numbered else f" {marker}"
line = f"{prefix} {item.text}"
if item.status == ITEM_IMPOSSIBLE and item.evidence:
line += f" (impossible: {item.evidence})"
lines.append(line)
return "\n".join(lines)
# ──────────────────────────────────────────────────────────────────────
# Persistence (SessionDB state_meta)
# ──────────────────────────────────────────────────────────────────────
def _meta_key(session_id: str) -> str:
return f"goal:{session_id}"
_DB_CACHE: Dict[str, Any] = {}
def _get_session_db() -> Optional[Any]:
"""Return a SessionDB instance for the current HERMES_HOME.
SessionDB has no built-in singleton, but opening a new connection per
/goal call would thrash the file. We cache one instance per
``hermes_home`` path so profile switches still pick up the right DB.
Defensive against import/instantiation failures so tests and
non-standard launchers can still use the GoalManager.
"""
try:
from hermes_constants import get_hermes_home
from hermes_state import SessionDB
home = str(get_hermes_home())
except Exception as exc: # pragma: no cover
logger.debug("GoalManager: SessionDB bootstrap failed (%s)", exc)
return None
cached = _DB_CACHE.get(home)
if cached is not None:
return cached
try:
db = SessionDB()
except Exception as exc: # pragma: no cover
logger.debug("GoalManager: SessionDB() raised (%s)", exc)
return None
_DB_CACHE[home] = db
return db
def load_goal(session_id: str) -> Optional[GoalState]:
"""Load the goal for a session, or None if none exists."""
if not session_id:
return None
db = _get_session_db()
if db is None:
return None
try:
raw = db.get_meta(_meta_key(session_id))
except Exception as exc:
logger.debug("GoalManager: get_meta failed: %s", exc)
return None
if not raw:
return None
try:
return GoalState.from_json(raw)
except Exception as exc:
logger.warning("GoalManager: could not parse stored goal for %s: %s", session_id, exc)
return None
def save_goal(session_id: str, state: GoalState) -> None:
"""Persist a goal to SessionDB. No-op if DB unavailable."""
if not session_id:
return
db = _get_session_db()
if db is None:
return
try:
db.set_meta(_meta_key(session_id), state.to_json())
except Exception as exc:
logger.debug("GoalManager: set_meta failed: %s", exc)
def clear_goal(session_id: str) -> None:
"""Mark a goal cleared in the DB (preserved for audit, status=cleared)."""
state = load_goal(session_id)
if state is None:
return
state.status = "cleared"
save_goal(session_id, state)
# ──────────────────────────────────────────────────────────────────────
# Conversation-history dump (read by the judge tool loop)
# ──────────────────────────────────────────────────────────────────────
def _goals_dump_dir() -> Optional[Path]:
"""Return ``<HERMES_HOME>/goals`` (created on first use), or None on error."""
try:
from hermes_constants import get_hermes_home
home = Path(get_hermes_home())
except Exception as exc:
logger.debug("goals dump dir: get_hermes_home failed: %s", exc)
return None
try:
path = home / "goals"
path.mkdir(parents=True, exist_ok=True)
return path
except Exception as exc:
logger.debug("goals dump dir: mkdir failed: %s", exc)
return None
def _safe_session_filename(session_id: str) -> str:
"""Make a session_id safe for use as a filename component."""
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", session_id or "unknown")
# Bound length to keep filesystem happy.
return cleaned[:128] or "unknown"
def conversation_dump_path(session_id: str) -> Optional[Path]:
"""Where the dumped messages JSON for ``session_id`` lives."""
base = _goals_dump_dir()
if base is None:
return None
return base / f"{_safe_session_filename(session_id)}.json"
def dump_conversation(session_id: str, messages: List[Dict[str, Any]]) -> Optional[Path]:
"""Write ``messages`` to the goals/ dump file. Returns the path on success."""
if not session_id or not messages:
return None
path = conversation_dump_path(session_id)
if path is None:
return None
try:
# Best-effort: messages may contain non-JSON-serializable objects from
# provider-specific adapter shims. Fall through with default=str.
with open(path, "w", encoding="utf-8") as fh:
json.dump(messages, fh, ensure_ascii=False, indent=2, default=str)
return path
except Exception as exc:
logger.debug("dump_conversation: write failed: %s", exc)
return None
# ──────────────────────────────────────────────────────────────────────
# Judge: parsing helpers
# ──────────────────────────────────────────────────────────────────────
def _truncate(text: str, limit: int) -> str:
if not text:
return ""
if len(text) <= limit:
return text
return text[:limit] + "… [truncated]"
_JSON_OBJECT_RE = re.compile(r"\{.*\}", re.DOTALL)
def _extract_json_object(raw: str) -> Optional[Dict[str, Any]]:
"""Best-effort extraction of a single JSON object from a possibly-prosey reply."""
if not raw:
return None
text = raw.strip()
if text.startswith("```"):
text = text.strip("`")
nl = text.find("\n")
if nl != -1:
text = text[nl + 1:]
try:
data = json.loads(text)
except Exception:
match = _JSON_OBJECT_RE.search(text)
if not match:
return None
try:
data = json.loads(match.group(0))
except Exception:
return None
return data if isinstance(data, dict) else None
def _parse_judge_response(raw: str) -> Tuple[bool, str, bool]:
"""Parse the freeform judge's reply. Fail-open to ``(False, "<reason>", parse_failed)``.
Returns ``(done, reason, parse_failed)``. ``parse_failed`` is True when the
judge returned output that couldn't be interpreted as the expected JSON
verdict (empty body, prose, malformed JSON). Callers use that flag to
auto-pause after N consecutive parse failures so a weak judge model
doesn't silently burn the turn budget.
"""
if not raw:
return False, "judge returned empty response", True
data = _extract_json_object(raw)
if data is None:
return False, f"judge reply was not JSON: {_truncate(raw, 200)!r}", True
done_val = data.get("done")
if isinstance(done_val, str):
done = done_val.strip().lower() in ("true", "yes", "1", "done")
else:
done = bool(done_val)
reason = str(data.get("reason") or "").strip()
if not reason:
reason = "no reason provided"
return done, reason, False
def _parse_decompose_response(raw: str) -> Tuple[List[Dict[str, Any]], bool]:
"""Parse a Phase-A decompose reply. Returns (items, parse_failed)."""
if not raw:
return [], True
data = _extract_json_object(raw)
if data is None:
return [], True
raw_items = data.get("checklist")
if not isinstance(raw_items, list):
return [], True
out: List[Dict[str, Any]] = []
for item in raw_items:
if isinstance(item, dict):
text = str(item.get("text", "")).strip()
if text:
out.append({"text": text})
elif isinstance(item, str):
text = item.strip()
if text:
out.append({"text": text})
return out, False
def _parse_evaluate_response(raw: str) -> Tuple[Dict[str, Any], bool]:
"""Parse a Phase-B checklist eval reply. Returns (parsed, parse_failed).
parsed = {"updates": [...], "new_items": [...], "reason": str}
"""
if not raw:
return {"updates": [], "new_items": [], "reason": "judge returned empty response"}, True
data = _extract_json_object(raw)
if data is None:
return (
{
"updates": [],
"new_items": [],
"reason": f"judge reply was not JSON: {_truncate(raw, 200)!r}",
},
True,
)
updates = data.get("updates") or []
new_items = data.get("new_items") or []
reason = str(data.get("reason") or "").strip() or "no reason provided"
norm_updates = []
if isinstance(updates, list):
for upd in updates:
if not isinstance(upd, dict):
continue
try:
# Judge sees the checklist rendered with 1-based indices
# (matches the /subgoal CLI). Convert to 0-based here so the
# apply layer can index ``state.checklist`` directly.
idx_1based = int(upd.get("index"))
except (TypeError, ValueError):
continue
idx = idx_1based - 1
status = str(upd.get("status", "")).strip().lower()
if status not in TERMINAL_ITEM_STATUSES:
# Phase-B only accepts terminal flips. Pending → pending is a no-op.
continue
evidence = str(upd.get("evidence") or "").strip() or None
norm_updates.append({"index": idx, "status": status, "evidence": evidence})
norm_new = []
if isinstance(new_items, list):
for it in new_items:
if isinstance(it, dict):
text = str(it.get("text", "")).strip()
if text:
norm_new.append({"text": text})
elif isinstance(it, str):
text = it.strip()
if text:
norm_new.append({"text": text})
return {"updates": norm_updates, "new_items": norm_new, "reason": reason}, False
# ──────────────────────────────────────────────────────────────────────
# Judge: read_file tool for the judge's bounded tool loop
# ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────
# Judge tool schemas: read_file (history inspection) +
# submit_checklist (Phase A) + update_checklist (Phase B)
#
# Forcing the judge to emit through tool calls is dramatically more
# reliable than asking it to reply with JSON text. Most providers
# enforce the schema server-side, so weak/small judge models can no
# longer drift into prose, markdown fences, or empty bodies.
# ──────────────────────────────────────────────────────────────────────
_JUDGE_READ_FILE_TOOL_SCHEMA: Dict[str, Any] = {
"type": "function",
"function": {
"name": "read_file",
"description": (
"Read a portion of the dumped conversation history JSON file. "
"Use this when the snippet alone isn't enough to rule. Returns "
"lines from the file with 1-based line numbers. Pagination "
"supported via offset and limit. Reads beyond a built-in cap "
"are truncated."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"Absolute path to the conversation history file. "
"You were given this in the user message."
),
},
"offset": {
"type": "integer",
"description": "1-indexed starting line number (default 1).",
"default": 1,
},
"limit": {
"type": "integer",
"description": (
f"Max lines to return (default {_JUDGE_READ_FILE_MAX_LINES})."
),
"default": _JUDGE_READ_FILE_MAX_LINES,
},
},
"required": ["path"],
},
},
}
_JUDGE_SUBMIT_CHECKLIST_TOOL_SCHEMA: Dict[str, Any] = {
"type": "function",
"function": {
"name": "submit_checklist",
"description": (
"Submit the harsh, detailed completion-criteria checklist you "
"decomposed the goal into. Each item is one verifiable "
"completion criterion. Bias toward more items, not fewer."
),
"parameters": {
"type": "object",
"properties": {
"items": {
"type": "array",
"description": (
"List of checklist items. Each item is a single "
"verifiable statement of fact about the finished "
"work. Aim for at least 5 items; more is better "
"when warranted."
),
"items": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The completion-criterion text.",
},
},
"required": ["text"],
},
},
},
"required": ["items"],
},
},
}
_JUDGE_UPDATE_CHECKLIST_TOOL_SCHEMA: Dict[str, Any] = {
"type": "function",
"function": {
"name": "update_checklist",
"description": (
"Issue your verdict on the current checklist. For each "
"currently-pending item, decide whether the agent's most "
"recent response (and conversation history if you read it) "
"shows the item is satisfied. You may also append new items "
"the original decomposition missed. Call this exactly once "
"when you are ready to rule — calling it ends the evaluation."
),
"parameters": {
"type": "object",
"properties": {
"updates": {
"type": "array",
"description": (
"Per-item rulings. Use the 1-based ``index`` shown "
"in the checklist. ``status`` must be 'completed' "
"(clear evidence the item is done) or 'impossible' "
"(item cannot be achieved in this environment). "
"Items already in a terminal status are frozen — "
"do not include them."
),
"items": {
"type": "object",
"properties": {
"index": {
"type": "integer",
"description": "1-based checklist index.",
},
"status": {
"type": "string",
"enum": ["completed", "impossible"],
},
"evidence": {
"type": "string",
"description": (
"One-sentence specific citation of why "
"this item is done or impossible. "
"Reference the agent's actual output."
),
},
},
"required": ["index", "status", "evidence"],
},
},
"new_items": {
"type": "array",
"description": (
"Optional: completion criteria the original "
"decomposition missed. Stay strict — only add "
"items that genuinely belong as completion "
"criteria for this goal."
),
"items": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The new criterion text.",
},
},
"required": ["text"],
},
},
"reason": {
"type": "string",
"description": "One-sentence overall rationale for this round of updates.",
},
},
"required": ["updates", "new_items", "reason"],
},
},
}
def _judge_read_file(
path: str,
*,
offset: int = 1,
limit: int = _JUDGE_READ_FILE_MAX_LINES,
allowed_path: Optional[Path] = None,
) -> str:
"""Bounded read of the dumped conversation file. Returns JSON-serializable text.
Restricted to ``allowed_path`` when provided — the judge cannot use this
tool to read arbitrary files.
"""
if not path:
return json.dumps({"error": "path is required"})
try:
target = Path(path).resolve()
except Exception as exc:
return json.dumps({"error": f"path resolve failed: {exc}"})
if allowed_path is not None:
try:
allowed = allowed_path.resolve()
except Exception:
allowed = allowed_path
if target != allowed:
return json.dumps({
"error": (
f"read_file is restricted to the conversation dump path. "
f"Allowed: {allowed}"
)
})
if not target.exists():
return json.dumps({"error": f"file not found: {target}"})
try:
offset = max(1, int(offset or 1))
limit = max(1, min(int(limit or _JUDGE_READ_FILE_MAX_LINES), _JUDGE_READ_FILE_MAX_LINES))
except (TypeError, ValueError):
return json.dumps({"error": "offset and limit must be integers"})
try:
with open(target, "r", encoding="utf-8", errors="replace") as fh:
lines = fh.readlines()
except Exception as exc:
return json.dumps({"error": f"read failed: {exc}"})
total = len(lines)
start = offset - 1
end = min(start + limit, total)
slice_lines = lines[start:end]
out = "".join(slice_lines)
if len(out) > _JUDGE_READ_FILE_MAX_CHARS:
out = out[:_JUDGE_READ_FILE_MAX_CHARS] + "\n… [truncated by judge read cap]"
return json.dumps({
"path": str(target),
"total_lines": total,
"offset": offset,
"returned": len(slice_lines),
"next_offset": end + 1 if end < total else None,
"content": out,
}, ensure_ascii=False)
# ──────────────────────────────────────────────────────────────────────
# Judge: phase-A (decompose) and phase-B (evaluate)
# ──────────────────────────────────────────────────────────────────────
def _get_judge_client() -> Tuple[Optional[Any], str]:
"""Return (client, model) or (None, '') when unavailable."""
try:
from agent.auxiliary_client import get_text_auxiliary_client
except Exception as exc:
logger.debug("goal judge: auxiliary client import failed: %s", exc)
return None, ""
try:
client, model = get_text_auxiliary_client("goal_judge")
except Exception as exc:
logger.debug("goal judge: get_text_auxiliary_client failed: %s", exc)
return None, ""
if client is None or not model:
return None, ""
return client, model
def _extract_tool_call(msg: Any, tool_name: str) -> Optional[Dict[str, Any]]:
"""Find a tool call by name on a chat-completions message. Returns
``{"id", "name", "arguments": <dict>}`` or None.
Robust to provider shims that return tool_calls as objects or dicts
and arguments as JSON strings or already-parsed dicts.
"""
tool_calls = getattr(msg, "tool_calls", None) or []
for tc in tool_calls:
try:
tc_id = getattr(tc, "id", None) or (tc.get("id") if isinstance(tc, dict) else None) or "tc-?"
fn = getattr(tc, "function", None) or (tc.get("function") if isinstance(tc, dict) else None)
if fn is None:
continue
fn_name = getattr(fn, "name", None) or (fn.get("name") if isinstance(fn, dict) else "")
if fn_name != tool_name:
continue
fn_args_raw = getattr(fn, "arguments", None) or (fn.get("arguments") if isinstance(fn, dict) else "")
if isinstance(fn_args_raw, str):
try:
args = json.loads(fn_args_raw) if fn_args_raw else {}
except Exception:
args = {}
elif isinstance(fn_args_raw, dict):
args = fn_args_raw
else:
args = {}
return {"id": tc_id, "name": fn_name, "arguments": args}
except Exception:
continue
return None
def _serialize_assistant_tool_calls(msg: Any) -> List[Dict[str, Any]]:
"""Convert a provider-shim tool_calls list into plain-dict form for
inclusion in subsequent ``messages=[...]`` payloads."""
out: List[Dict[str, Any]] = []
for tc in getattr(msg, "tool_calls", None) or []:
try:
tc_id = getattr(tc, "id", None) or (tc.get("id") if isinstance(tc, dict) else None) or "tc-?"
fn = getattr(tc, "function", None) or (tc.get("function") if isinstance(tc, dict) else None)
fn_name = getattr(fn, "name", None) or (fn.get("name") if isinstance(fn, dict) else "")
fn_args = getattr(fn, "arguments", None) or (fn.get("arguments") if isinstance(fn, dict) else "")
if not isinstance(fn_args, str):
try:
fn_args = json.dumps(fn_args)
except Exception:
fn_args = "{}"
out.append({
"id": tc_id,
"type": "function",
"function": {"name": fn_name or "", "arguments": fn_args},
})
except Exception:
continue
return out
def _call_judge_with_tool_choice(
client: Any,
*,
model: str,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
forced_tool_name: Optional[str],
timeout: float,
max_tokens: int = 1500,
) -> Tuple[Optional[Any], Optional[str]]:
"""Call the judge with a forced tool choice, falling back to ``auto``
if the provider rejects ``required`` / a specific function choice.
Returns ``(response, error)``. On success, ``error`` is None.
"""
# First attempt: force the specific tool. Most modern providers
# support {"type": "function", "function": {"name": "..."}}.
primary_choice: Any
if forced_tool_name:
primary_choice = {"type": "function", "function": {"name": forced_tool_name}}
else:
primary_choice = "required"
attempts: List[Any] = [primary_choice, "required", "auto"]
last_err: Optional[str] = None
for choice in attempts:
try:
return client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice=choice,
temperature=0,
max_tokens=max_tokens,
timeout=timeout,
), None
except Exception as exc:
last_err = f"{type(exc).__name__}: {exc}"
# Only retry on errors that look like the provider rejecting the
# tool_choice shape. Network errors etc. should bail immediately.
msg = str(exc).lower()
if not any(token in msg for token in (
"tool_choice", "tool choice", "required", "function call",
"unsupported", "not supported", "invalid", "400",
)):
return None, last_err
logger.debug("goal judge: tool_choice=%r rejected (%s); falling back", choice, exc)
continue
return None, last_err or "all tool_choice fallbacks failed"
def decompose_goal(
goal: str,
*,
timeout: float = DEFAULT_JUDGE_TIMEOUT,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""Phase-A: ask the judge to break the goal into a checklist via a
forced ``submit_checklist`` tool call.
Returns ``(items, error)``. On any failure, returns ``([], reason)``
so the caller can fall back to freeform mode.
"""
if not goal.strip():
return [], "empty goal"
client, model = _get_judge_client()
if client is None:
return [], "auxiliary client unavailable"
messages = [
{"role": "system", "content": DECOMPOSE_SYSTEM_PROMPT},
{
"role": "user",
"content": DECOMPOSE_USER_PROMPT_TEMPLATE.format(
goal=_truncate(goal, 4000)
),
},
]
resp, err = _call_judge_with_tool_choice(
client,
model=model,
messages=messages,
tools=[_JUDGE_SUBMIT_CHECKLIST_TOOL_SCHEMA],
forced_tool_name="submit_checklist",
timeout=timeout,
max_tokens=2000,
)
if resp is None:
logger.info("goal decompose: API call failed (%s)", err)
return [], f"decompose error: {err}"
try:
msg = resp.choices[0].message
except Exception:
return [], "decompose response malformed"
tc = _extract_tool_call(msg, "submit_checklist")
if tc is None:
# Provider responded but didn't call the tool. Try parsing content
# as a last-ditch backstop so a fully-broken provider doesn't
# silently leave the user with no checklist at all.
content = getattr(msg, "content", "") or ""
items, parse_failed = _parse_decompose_response(content)
if parse_failed or not items:
logger.info(
"goal decompose: no submit_checklist tool call AND no parseable JSON (raw=%r)",
_truncate(content, 200),
)
return [], "decompose: judge did not call submit_checklist"
logger.info("goal decompose: fell back to JSON-content parser (%d items)", len(items))
return items, None
raw_items = tc["arguments"].get("items") or []
items: List[Dict[str, Any]] = []
if isinstance(raw_items, list):
for entry in raw_items:
if isinstance(entry, dict):
text = str(entry.get("text", "")).strip()
if text:
items.append({"text": text})
elif isinstance(entry, str):
text = entry.strip()
if text:
items.append({"text": text})
if not items:
logger.info("goal decompose: submit_checklist returned empty items list")
return [], "decompose: empty checklist"
logger.info("goal decompose: produced %d checklist items via tool call", len(items))
return items, None
def judge_goal_freeform(
goal: str,
last_response: str,
*,
timeout: float = DEFAULT_JUDGE_TIMEOUT,
) -> Tuple[str, str, bool]:
"""Legacy freeform judge — kept for goals with no checklist.
Returns ``(verdict, reason, parse_failed)`` where verdict is ``"done"``,
``"continue"``, or ``"skipped"``.
"""
if not goal.strip():
return "skipped", "empty goal", False
if not last_response.strip():
return "continue", "empty response (nothing to evaluate)", False
client, model = _get_judge_client()
if client is None:
return "continue", "auxiliary client unavailable", False
prompt = EVALUATE_USER_PROMPT_FREEFORM_TEMPLATE.format(
goal=_truncate(goal, 2000),
response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS),
)
try:
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": EVALUATE_SYSTEM_PROMPT_FREEFORM},
{"role": "user", "content": prompt},
],
temperature=0,
max_tokens=200,
timeout=timeout,
)
except Exception as exc:
logger.info("goal judge: API call failed (%s) — falling through to continue", exc)
return "continue", f"judge error: {type(exc).__name__}", False
try:
raw = resp.choices[0].message.content or ""
except Exception:
raw = ""
done, reason, parse_failed = _parse_judge_response(raw)
verdict = "done" if done else "continue"
logger.info("goal judge (freeform): verdict=%s reason=%s", verdict, _truncate(reason, 120))
return verdict, reason, parse_failed
def evaluate_checklist(
state: GoalState,
last_response: str,
*,
history_path: Optional[Path],
timeout: float = DEFAULT_JUDGE_TIMEOUT,
max_tool_calls: int = DEFAULT_MAX_JUDGE_TOOL_CALLS,
) -> Tuple[Dict[str, Any], bool]:
"""Phase-B: judge evaluates each pending checklist item via forced
tool calls.
The judge has two tools available:
- ``read_file``: inspect the dumped conversation history
- ``update_checklist``: issue the verdict (terminates the loop)
``tool_choice="required"`` forces one of them every iteration. We loop
until ``update_checklist`` is called or ``max_tool_calls`` is exhausted.
Returns ``(parsed, parse_failed)`` where parsed is
``{"updates": [...], "new_items": [...], "reason": str}``.
Falls open on transport errors: empty updates/new_items, parse_failed=False.
"""
client, model = _get_judge_client()
if client is None:
return ({"updates": [], "new_items": [], "reason": "auxiliary client unavailable"}, False)
# Render checklist with 1-based indices the judge addresses via the
# update_checklist tool's ``index`` field.
checklist_block = state.render_checklist(numbered=True)
user_prompt = EVALUATE_USER_PROMPT_CHECKLIST_TEMPLATE.format(
goal=_truncate(state.goal, 2000),
checklist_block=checklist_block,
response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS),
history_path=str(history_path) if history_path else "(unavailable — judge from snippet only)",
)
messages: List[Dict[str, Any]] = [
{"role": "system", "content": EVALUATE_SYSTEM_PROMPT_CHECKLIST},
{"role": "user", "content": user_prompt},
]
# Build the toolbox: read_file is only useful when we actually have a
# history file to read, so we omit it otherwise to keep the schema lean.
tools: List[Dict[str, Any]] = [_JUDGE_UPDATE_CHECKLIST_TOOL_SCHEMA]
if history_path is not None:
tools.insert(0, _JUDGE_READ_FILE_TOOL_SCHEMA)
reads_left = max(0, int(max_tool_calls)) if history_path is not None else 0
# Bound the overall loop generously — the judge will normally finish in
# one or two passes (read_file once, then update_checklist; or just
# update_checklist directly).
for iteration in range(reads_left + 2):
# When out of read budget, drop read_file from the toolbox so the
# judge MUST emit update_checklist.
loop_tools = tools if reads_left > 0 else [_JUDGE_UPDATE_CHECKLIST_TOOL_SCHEMA]
# Forcing update_checklist directly when reads are exhausted gives
# us the strongest guarantee of termination.
forced = "update_checklist" if reads_left <= 0 else None
resp, err = _call_judge_with_tool_choice(
client,
model=model,
messages=messages,
tools=loop_tools,
forced_tool_name=forced,
timeout=timeout,
max_tokens=1500,
)
if resp is None:
logger.info("goal judge (checklist): API call failed (%s)", err)
return (
{
"updates": [],
"new_items": [],
"reason": f"judge error: {err}",
},
False,
)
try:
msg = resp.choices[0].message
except Exception:
return (
{"updates": [], "new_items": [], "reason": "judge response malformed"},
True,
)
# Did the judge call update_checklist? If yes, we're done.
update_tc = _extract_tool_call(msg, "update_checklist")
if update_tc is not None:
parsed = _normalize_update_args(update_tc["arguments"])
logger.info(
"goal judge (checklist): updates=%d new_items=%d reason=%s",
len(parsed.get("updates") or []),
len(parsed.get("new_items") or []),
_truncate(parsed.get("reason", ""), 120),
)
return parsed, False
# Did the judge call read_file? If yes, run it and feed the result back.
read_tc = _extract_tool_call(msg, "read_file")
if read_tc is not None and reads_left > 0:
args = read_tc["arguments"]
tool_result = _judge_read_file(
str(args.get("path", "")),
offset=args.get("offset", 1),
limit=args.get("limit", _JUDGE_READ_FILE_MAX_LINES),
allowed_path=history_path,
)
messages.append({
"role": "assistant",
"content": getattr(msg, "content", "") or "",
"tool_calls": _serialize_assistant_tool_calls(msg),
})
messages.append({
"role": "tool",
"tool_call_id": read_tc["id"],
"name": "read_file",
"content": tool_result,
})
reads_left -= 1
continue
# Neither tool was called. Try parsing the content body as a last-
# ditch backstop, then bail.
content = getattr(msg, "content", "") or ""
if content.strip():
parsed, parse_failed = _parse_evaluate_response(content)
if not parse_failed:
logger.info(
"goal judge (checklist): fell back to JSON-content parser "
"updates=%d new_items=%d",
len(parsed.get("updates") or []),
len(parsed.get("new_items") or []),
)
return parsed, False
logger.info(
"goal judge (checklist): judge emitted neither read_file nor "
"update_checklist (iteration=%d, content=%r) — bailing",
iteration, _truncate(content, 120),
)
return (
{
"updates": [],
"new_items": [],
"reason": "judge did not call update_checklist",
},
True,
)
# Loop exhausted without an update_checklist call.
return (
{
"updates": [],
"new_items": [],
"reason": "judge tool-loop exhausted without verdict",
},
True,
)
def _normalize_update_args(args: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and normalize the ``update_checklist`` tool arguments.
Performs the same 1-based → 0-based conversion and terminal-status
filter as ``_parse_evaluate_response``. Returns the canonical
``{updates, new_items, reason}`` shape callers expect.
"""
raw_updates = args.get("updates") or []
raw_new = args.get("new_items") or []
reason = str(args.get("reason") or "").strip() or "no reason provided"
norm_updates: List[Dict[str, Any]] = []
if isinstance(raw_updates, list):
for upd in raw_updates:
if not isinstance(upd, dict):
continue
try:
idx_1based = int(upd.get("index"))
except (TypeError, ValueError):
continue
status = str(upd.get("status", "")).strip().lower()
if status not in TERMINAL_ITEM_STATUSES:
continue
evidence = str(upd.get("evidence") or "").strip() or None
norm_updates.append({
"index": idx_1based - 1, # 1-based → 0-based for apply layer
"status": status,
"evidence": evidence,
})
norm_new: List[Dict[str, Any]] = []
if isinstance(raw_new, list):
for it in raw_new:
if isinstance(it, dict):
text = str(it.get("text", "")).strip()
if text:
norm_new.append({"text": text})
elif isinstance(it, str):
text = it.strip()
if text:
norm_new.append({"text": text})
return {"updates": norm_updates, "new_items": norm_new, "reason": reason}
# ──────────────────────────────────────────────────────────────────────
# GoalManager — the orchestration surface CLI + gateway talk to
# ──────────────────────────────────────────────────────────────────────
class GoalManager:
"""Per-session goal state + continuation decisions.
The CLI and gateway each hold one ``GoalManager`` per live session.
Methods:
- ``set(goal)`` — start a new standing goal.
- ``clear()`` — remove the active goal.
- ``pause()`` / ``resume()`` — explicit user controls.
- ``status()`` — printable one-liner.
- ``add_subgoal(text)`` — user appends a checklist item.
- ``mark_subgoal(index, status)`` — user flips an item (override).
- ``remove_subgoal(index)`` — user deletes an item.
- ``clear_checklist()`` — user wipes the checklist; next turn re-decomposes.
- ``evaluate_after_turn(last_response, agent=None)`` — call the judge,
update state, return a decision dict.
- ``next_continuation_prompt()`` — the canonical user-role message to
feed back into ``run_conversation``.
"""
def __init__(self, session_id: str, *, default_max_turns: int = DEFAULT_MAX_TURNS):
self.session_id = session_id
self.default_max_turns = int(default_max_turns or DEFAULT_MAX_TURNS)
self._state: Optional[GoalState] = load_goal(session_id)
# --- introspection ------------------------------------------------
@property
def state(self) -> Optional[GoalState]:
return self._state
def is_active(self) -> bool:
return self._state is not None and self._state.status == "active"
def has_goal(self) -> bool:
return self._state is not None and self._state.status in ("active", "paused")
def status_line(self) -> str:
s = self._state
if s is None or s.status in ("cleared",):
return "No active goal. Set one with /goal <text>."
turns = f"{s.turns_used}/{s.max_turns} turns"
cl_total, cl_done, cl_imp, _ = s.checklist_counts()
cl_text = ""
if cl_total:
cl_text = f", {cl_done + cl_imp}/{cl_total} done"
if s.status == "active":
return f"⊙ Goal (active, {turns}{cl_text}): {s.goal}"
if s.status == "paused":
extra = f"{s.paused_reason}" if s.paused_reason else ""
return f"⏸ Goal (paused, {turns}{cl_text}{extra}): {s.goal}"
if s.status == "done":
return f"✓ Goal done ({turns}{cl_text}): {s.goal}"
return f"Goal ({s.status}, {turns}{cl_text}): {s.goal}"
def render_checklist(self) -> str:
"""Public helper for the /subgoal slash command."""
if self._state is None:
return "(no active goal)"
if not self._state.checklist:
return "(checklist empty — judge will populate it on the next turn)"
return self._state.render_checklist(numbered=True)
# --- mutation -----------------------------------------------------
def set(self, goal: str, *, max_turns: Optional[int] = None) -> GoalState:
goal = (goal or "").strip()
if not goal:
raise ValueError("goal text is empty")
state = GoalState(
goal=goal,
status="active",
turns_used=0,
max_turns=int(max_turns) if max_turns else self.default_max_turns,
created_at=time.time(),
last_turn_at=0.0,
checklist=[],
decomposed=False,
)
self._state = state
save_goal(self.session_id, state)
return state
def pause(self, reason: str = "user-paused") -> Optional[GoalState]:
if not self._state:
return None
self._state.status = "paused"
self._state.paused_reason = reason
save_goal(self.session_id, self._state)
return self._state
def resume(self, *, reset_budget: bool = True) -> Optional[GoalState]:
if not self._state:
return None
self._state.status = "active"
self._state.paused_reason = None
if reset_budget:
self._state.turns_used = 0
save_goal(self.session_id, self._state)
return self._state
def clear(self) -> None:
if self._state is None:
return
self._state.status = "cleared"
save_goal(self.session_id, self._state)
self._state = None
def mark_done(self, reason: str) -> None:
if not self._state:
return
self._state.status = "done"
self._state.last_verdict = "done"
self._state.last_reason = reason
save_goal(self.session_id, self._state)
# --- /subgoal user controls ---------------------------------------
def add_subgoal(self, text: str) -> ChecklistItem:
"""User appends a new checklist item. Requires an active or paused goal."""
if self._state is None:
raise RuntimeError("no active goal")
text = (text or "").strip()
if not text:
raise ValueError("subgoal text is empty")
item = ChecklistItem(
text=text,
status=ITEM_PENDING,
added_by=ADDED_BY_USER,
added_at=time.time(),
)
self._state.checklist.append(item)
save_goal(self.session_id, self._state)
return item
def mark_subgoal(self, index_1based: int, status: str) -> ChecklistItem:
"""User overrides an item's status.
``status`` may be ``completed``, ``impossible``, or ``pending``
(the last only as an undo flow). Stickiness rules do NOT apply to
user actions — the user is the only authority that can revert
terminal items.
"""
if self._state is None:
raise RuntimeError("no active goal")
status = (status or "").strip().lower()
if status not in VALID_ITEM_STATUSES:
raise ValueError(
f"status must be one of {sorted(VALID_ITEM_STATUSES)}; got {status!r}"
)
idx = int(index_1based) - 1
if idx < 0 or idx >= len(self._state.checklist):
raise IndexError(
f"index out of range (1..{len(self._state.checklist)})"
)
item = self._state.checklist[idx]
item.status = status
if status in TERMINAL_ITEM_STATUSES:
item.completed_at = time.time()
if not item.evidence:
item.evidence = "marked by user"
else:
item.completed_at = None
# Don't wipe judge-supplied evidence on undo — useful audit trail.
save_goal(self.session_id, self._state)
return item
def remove_subgoal(self, index_1based: int) -> ChecklistItem:
if self._state is None:
raise RuntimeError("no active goal")
idx = int(index_1based) - 1
if idx < 0 or idx >= len(self._state.checklist):
raise IndexError(
f"index out of range (1..{len(self._state.checklist)})"
)
removed = self._state.checklist.pop(idx)
save_goal(self.session_id, self._state)
return removed
def clear_checklist(self) -> None:
"""Wipe the checklist and reset decomposed=False so the judge re-decomposes."""
if self._state is None:
return
self._state.checklist = []
self._state.decomposed = False
save_goal(self.session_id, self._state)
# --- the main entry point called after every turn -----------------
def evaluate_after_turn(
self,
last_response: str,
*,
user_initiated: bool = True,
agent: Any = None,
messages: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Run the judge and update state. Return a decision dict.
``user_initiated`` distinguishes a real user prompt (True) from a
continuation prompt we fed ourselves (False). Both increment
``turns_used`` because both consume model budget.
``messages`` is the agent's full conversation list for this session.
When provided, it's dumped to ``<HERMES_HOME>/goals/<sid>.json`` so
the Phase-B judge's read_file tool can inspect history. Optional —
when missing, the judge runs from the snippet only.
``agent`` is a back-compat path — when ``messages`` is None we try
to extract them from common AIAgent attribute names. Most callers
should pass ``messages`` directly because AIAgent does not store
the message list as a public instance attribute.
Decision keys:
- ``status``: current goal status after update
- ``should_continue``: bool — caller should fire another turn
- ``continuation_prompt``: str or None
- ``verdict``: "done" | "continue" | "skipped" | "inactive" | "decompose"
- ``reason``: str
- ``message``: user-visible one-liner to print/send
"""
state = self._state
if state is None or state.status != "active":
return {
"status": state.status if state else None,
"should_continue": False,
"continuation_prompt": None,
"verdict": "inactive",
"reason": "no active goal",
"message": "",
}
# Count the turn that just finished.
state.turns_used += 1
state.last_turn_at = time.time()
# ── Phase A: decompose (first call after /goal set) ───────────
if not state.decomposed:
items, err = decompose_goal(state.goal)
state.decomposed = True
decompose_message = ""
if items:
now = time.time()
for entry in items:
state.checklist.append(
ChecklistItem(
text=entry["text"],
status=ITEM_PENDING,
added_by=ADDED_BY_JUDGE,
added_at=now,
)
)
state.last_verdict = "decompose"
state.last_reason = f"decomposed into {len(items)} items"
decompose_message = (
f"⊙ Goal checklist created ({len(items)} items). "
f"Use /subgoal to view or edit it."
)
save_goal(self.session_id, state)
return {
"status": "active",
"should_continue": True,
"continuation_prompt": self.next_continuation_prompt(),
"verdict": "decompose",
"reason": state.last_reason,
"message": decompose_message,
}
# Decompose failed — fall through to freeform mode below.
logger.info("goal: decompose failed (%s) — falling back to freeform judge", err)
state.last_reason = f"decompose failed: {err}"
# ── Phase B: evaluate ────────────────────────────────────────
verdict, reason, parse_failed = self._evaluate_state_phase_b(
state, last_response, agent=agent, messages=messages
)
state.last_verdict = verdict
state.last_reason = reason
# Track consecutive judge parse failures. Reset on any usable reply,
# including API / transport errors (parse_failed=False) so a flaky
# network doesn't trip the auto-pause meant for bad judge models.
if parse_failed:
state.consecutive_parse_failures += 1
else:
state.consecutive_parse_failures = 0
if verdict == "done":
state.status = "done"
save_goal(self.session_id, state)
return {
"status": "done",
"should_continue": False,
"continuation_prompt": None,
"verdict": "done",
"reason": reason,
"message": f"✓ Goal achieved: {reason}",
}
# Auto-pause when the judge model can't produce the expected JSON
# verdict N turns in a row.
if state.consecutive_parse_failures >= DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES:
state.status = "paused"
state.paused_reason = (
f"judge model returned unparseable output {state.consecutive_parse_failures} turns in a row"
)
save_goal(self.session_id, state)
return {
"status": "paused",
"should_continue": False,
"continuation_prompt": None,
"verdict": "continue",
"reason": reason,
"message": (
f"⏸ Goal paused — the judge model ({state.consecutive_parse_failures} turns) "
"isn't returning the required JSON verdict. Route the judge to a stricter "
"model in ~/.hermes/config.yaml:\n"
" auxiliary:\n"
" goal_judge:\n"
" provider: openrouter\n"
" model: google/gemini-3-flash-preview\n"
"Then /goal resume to continue."
),
}
if state.turns_used >= state.max_turns:
state.status = "paused"
state.paused_reason = f"turn budget exhausted ({state.turns_used}/{state.max_turns})"
save_goal(self.session_id, state)
return {
"status": "paused",
"should_continue": False,
"continuation_prompt": None,
"verdict": "continue",
"reason": reason,
"message": (
f"⏸ Goal paused — {state.turns_used}/{state.max_turns} turns used. "
"Use /goal resume to keep going, or /goal clear to stop."
),
}
save_goal(self.session_id, state)
cl_total, cl_done, cl_imp, _ = state.checklist_counts()
progress = ""
if cl_total:
progress = f"{cl_done + cl_imp}/{cl_total} done"
return {
"status": "active",
"should_continue": True,
"continuation_prompt": self.next_continuation_prompt(),
"verdict": "continue",
"reason": reason,
"message": (
f"↻ Continuing toward goal ({state.turns_used}/{state.max_turns}{progress}): {reason}"
),
}
def _evaluate_state_phase_b(
self,
state: GoalState,
last_response: str,
*,
agent: Any = None,
messages: Optional[List[Dict[str, Any]]] = None,
) -> Tuple[str, str, bool]:
"""Run the right kind of Phase-B evaluation given current state.
With a non-empty checklist: harsh per-item evaluation with a bounded
read_file tool loop.
With an empty checklist (e.g. decompose failed twice): fall back to
the legacy freeform judge so the goal still has a way to terminate.
"""
if not last_response.strip():
return "continue", "empty response (nothing to evaluate)", False
if state.checklist:
# Dump conversation history if we have one. Prefer explicit
# ``messages`` arg (most reliable); fall back to extracting from
# the agent instance for back-compat.
history_path: Optional[Path] = None
msgs: List[Dict[str, Any]] = []
if messages:
msgs = list(messages)
elif agent is not None:
msgs = self._extract_agent_messages(agent)
if msgs:
history_path = dump_conversation(self.session_id, msgs)
if history_path is None:
logger.debug(
"goal: conversation dump failed for session %s",
self.session_id,
)
else:
logger.debug(
"goal: no messages available for session %s — judge will run from snippet only",
self.session_id,
)
parsed, parse_failed = evaluate_checklist(
state, last_response, history_path=history_path
)
self._apply_checklist_updates(state, parsed)
if state.all_terminal():
return "done", parsed.get("reason") or "all checklist items terminal", parse_failed
return "continue", parsed.get("reason") or "checklist progress", parse_failed
# No checklist — freeform fallback.
verdict, reason, parse_failed = judge_goal_freeform(state.goal, last_response)
return verdict, reason, parse_failed
# --- internal helpers ---------------------------------------------
@staticmethod
def _extract_agent_messages(agent: Any) -> List[Dict[str, Any]]:
"""Best-effort extraction of the agent's conversation history.
Tries common attribute names so we don't tightly couple to AIAgent.
Returns an empty list when nothing is available.
"""
for attr in ("messages", "conversation_history", "_messages", "history"):
try:
msgs = getattr(agent, attr, None)
if isinstance(msgs, list) and msgs:
return msgs
except Exception:
continue
return []
@staticmethod
def _apply_checklist_updates(state: GoalState, parsed: Dict[str, Any]) -> None:
"""Apply judge updates with stickiness: never regress terminal items."""
now = time.time()
for upd in parsed.get("updates") or []:
try:
idx = int(upd["index"])
except (KeyError, TypeError, ValueError):
continue
if idx < 0 or idx >= len(state.checklist):
continue
item = state.checklist[idx]
if item.status in TERMINAL_ITEM_STATUSES:
# Stickiness: judge cannot regress a terminal item.
continue
new_status = upd.get("status")
if new_status not in TERMINAL_ITEM_STATUSES:
continue
item.status = new_status
item.completed_at = now
evidence = upd.get("evidence")
if evidence:
item.evidence = evidence
for new_item in parsed.get("new_items") or []:
text = (new_item.get("text") or "").strip()
if not text:
continue
state.checklist.append(
ChecklistItem(
text=text,
status=ITEM_PENDING,
added_by=ADDED_BY_JUDGE,
added_at=now,
)
)
# --- continuation prompt ------------------------------------------
def next_continuation_prompt(self) -> Optional[str]:
if not self._state or self._state.status != "active":
return None
if not self._state.checklist:
return CONTINUATION_PROMPT_TEMPLATE.format(goal=self._state.goal)
cl_total, cl_done, cl_imp, _ = self._state.checklist_counts()
return CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE.format(
goal=self._state.goal,
done=cl_done + cl_imp,
total=cl_total,
checklist=self._state.render_checklist(numbered=False),
)
# Public name kept for back-compat with the previous freeform-only API.
def judge_goal(
goal: str,
last_response: str,
*,
timeout: float = DEFAULT_JUDGE_TIMEOUT,
) -> Tuple[str, str, bool]:
"""Back-compat wrapper — defers to the freeform judge."""
return judge_goal_freeform(goal, last_response, timeout=timeout)
__all__ = [
"ChecklistItem",
"GoalState",
"GoalManager",
"CONTINUATION_PROMPT_TEMPLATE",
"CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE",
"DEFAULT_MAX_TURNS",
"DEFAULT_MAX_JUDGE_TOOL_CALLS",
"ITEM_PENDING",
"ITEM_COMPLETED",
"ITEM_IMPOSSIBLE",
"ITEM_MARKERS",
"TERMINAL_ITEM_STATUSES",
"VALID_ITEM_STATUSES",
"load_goal",
"save_goal",
"clear_goal",
"judge_goal",
"judge_goal_freeform",
"decompose_goal",
"evaluate_checklist",
"conversation_dump_path",
"dump_conversation",
]