"""Persistent session goals — the Ralph loop for Hermes. A goal is a free-form user objective that stays active across turns. After each turn completes, a small judge call asks an auxiliary model "is this goal satisfied by the assistant's last response?". If not, Hermes feeds a continuation prompt back into the same session and keeps working until the goal is done, turn budget is exhausted, the user pauses/clears it, or the user sends a new message (which takes priority and pauses the goal loop). Checklist mode (added 2026-05): when a goal is set, a Phase-A "decompose" call asks the judge to write an extremely detailed checklist of concrete completion criteria for that goal. On every subsequent turn (Phase B) the judge evaluates the agent's most recent output against EACH pending item and may flip pending → completed | impossible, or append new items it discovers along the way. The goal is done only when every checklist item is in a terminal status. This is much harsher than the freeform "is the goal done?" prompt and gives users a visible, verifiable progress surface via /subgoal. A bounded read_file tool loop lets the judge inspect the dumped conversation history when the snippet alone isn't enough to rule. State is persisted in SessionDB's ``state_meta`` table keyed by ``goal:`` so ``/resume`` picks it up. Design notes / invariants: - The continuation prompt is just a normal user message appended to the session via ``run_conversation``. No system-prompt mutation, no toolset swap — prompt caching stays intact. - Judge failures are fail-OPEN: ``continue``. A broken judge must not wedge progress; the turn budget is the backstop. - When a real user message arrives mid-loop it preempts the continuation prompt and also pauses the goal loop for that turn (we still re-judge after, so if the user's message happens to complete the goal the judge will say ``done``). - Stickiness: once an item is marked completed or impossible, only the user (via /subgoal undo) can flip it back. Judge updates that try to regress terminal items are silently ignored. - This module has zero hard dependency on ``cli.HermesCLI`` or the gateway runner — both wire the same ``GoalManager`` in. Nothing in this module touches the agent's system prompt or toolset. """ from __future__ import annotations import json import logging import os import re import time from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────── # Constants & defaults # ────────────────────────────────────────────────────────────────────── DEFAULT_MAX_TURNS = 20 DEFAULT_JUDGE_TIMEOUT = 60.0 # Cap how much of the last response we send to the judge inline. The judge # can read the dumped conversation file via read_file if it needs more. _JUDGE_RESPONSE_SNIPPET_CHARS = 4000 # After this many consecutive judge *parse* failures (empty output / non-JSON), # the loop auto-pauses and points the user at the goal_judge config. API / # transport errors do NOT count toward this — those are transient. This guards # against small models (e.g. deepseek-v4-flash) that cannot follow the strict # JSON reply contract; without it the loop runs until the turn budget is # exhausted with every reply shaped like `judge returned empty response` or # `judge reply was not JSON`. DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES = 3 # Bound the Phase-B judge tool loop: if the judge keeps calling read_file # without ever emitting a verdict, cap it so we don't burn the model's budget. DEFAULT_MAX_JUDGE_TOOL_CALLS = 5 # Cap a single read_file response so a judge that tries to read 100k lines # doesn't blow up its own context. Judge can paginate if needed. _JUDGE_READ_FILE_MAX_LINES = 400 _JUDGE_READ_FILE_MAX_CHARS = 32_000 # Status constants ──────────────────────────────────────────────────── ITEM_PENDING = "pending" ITEM_COMPLETED = "completed" ITEM_IMPOSSIBLE = "impossible" TERMINAL_ITEM_STATUSES = frozenset({ITEM_COMPLETED, ITEM_IMPOSSIBLE}) VALID_ITEM_STATUSES = frozenset({ITEM_PENDING, ITEM_COMPLETED, ITEM_IMPOSSIBLE}) ITEM_MARKERS = { ITEM_COMPLETED: "[x]", ITEM_IMPOSSIBLE: "[!]", ITEM_PENDING: "[ ]", } ADDED_BY_JUDGE = "judge" ADDED_BY_USER = "user" # ────────────────────────────────────────────────────────────────────── # Continuation prompt # ────────────────────────────────────────────────────────────────────── CONTINUATION_PROMPT_TEMPLATE = ( "[Continuing toward your standing goal]\n" "Goal: {goal}\n\n" "Continue working toward this goal. Take the next concrete step. " "If you believe the goal is complete, state so explicitly and stop. " "If you are blocked and need input from the user, say so clearly and stop." ) CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE = ( "[Continuing toward your standing goal]\n" "Goal: {goal}\n\n" "Checklist progress ({done}/{total} done):\n" "{checklist}\n\n" "Work on the unchecked items above. Do not declare items done yourself " "— a judge marks them based on evidence in your output. If an item is " "genuinely impossible in this environment, explain why so the judge can " "mark it impossible. If you are blocked on a remaining item and need " "user input, say so clearly and stop." ) # ────────────────────────────────────────────────────────────────────── # Phase-A: decompose prompts # ────────────────────────────────────────────────────────────────────── DECOMPOSE_SYSTEM_PROMPT = ( "You are a strict judge for an autonomous agent. Your first job, before " "judging anything, is to break the user's stated goal into an EXTREMELY " "DETAILED checklist of concrete, verifiable completion criteria. Each " "item must be specific enough that a third party reading the agent's " "output could decide unambiguously whether that item was achieved.\n\n" "Be exhaustive. Bias toward MORE items, not fewer. Include sub-items, " "edge cases, quality bars, deployment steps, verification checks, and " "anything the user would reasonably expect from a goal of this type. " "If the user said 'build me a website' you should be enumerating " "homepage exists, navigation links work, content is non-placeholder, " "mobile responsive, accessibility tags present, deployed somewhere " "publicly accessible, domain/URL is functional, etc. Better to " "over-specify and let a few items get marked impossible than to " "under-specify and let the agent declare victory early.\n\n" "Reply ONLY with a single JSON object on one line:\n" '{"checklist": [{"text": ""}, {"text": ""}, ...]}' ) DECOMPOSE_USER_PROMPT_TEMPLATE = ( "Goal:\n{goal}\n\n" "Produce the harshest, most detailed checklist of completion criteria " "you can. Aim for at least 5 items; more is better when warranted. " "Each item should be a single verifiable statement of fact about the " "finished work." ) # ────────────────────────────────────────────────────────────────────── # Phase-B: evaluate prompts # ────────────────────────────────────────────────────────────────────── EVALUATE_SYSTEM_PROMPT_FREEFORM = ( "You are a strict judge evaluating whether an autonomous agent has " "achieved a user's stated goal. You receive the goal text and the " "agent's most recent response. Your only job is to decide whether " "the goal is fully satisfied based on that response.\n\n" "A goal is DONE only when:\n" "- The response explicitly confirms the goal was completed, OR\n" "- The response clearly shows the final deliverable was produced, OR\n" "- The response explains the goal is unachievable / blocked / needs " "user input (treat this as DONE with reason describing the block).\n\n" "Otherwise the goal is NOT done — CONTINUE.\n\n" "Reply ONLY with a single JSON object on one line:\n" '{"done": , "reason": ""}' ) EVALUATE_SYSTEM_PROMPT_CHECKLIST = ( "You are a strict judge evaluating an autonomous agent's progress on " "a user's goal that has a detailed checklist of completion criteria. " "For EACH currently-pending checklist item, decide whether the " "available evidence shows the item is satisfied.\n\n" "Be strict but not absurd. Default to leaving items pending UNLESS " "evidence is reasonably clear. Reasonable evidence includes:\n" "- The agent's most recent response describing or showing the work\n" "- Tool call results visible in the conversation history (file writes, " "command output, web requests, etc.)\n" "- A clear statement by the agent that the work was done, when " "supported by tool output earlier in the conversation\n\n" "Do NOT require the agent to re-prove items it has already established " "in earlier turns. If a tool call earlier in the conversation already " "wrote a file, you do not need fresh `ls` output every turn — once " "established, it's done.\n\n" "Flip pending → completed when the response or recent tool calls show " "the item is satisfied. Flip pending → impossible only when the work " "demonstrates the item cannot be achieved in this environment (NOT " "merely that the agent didn't try). Vague intentions ('I will do X " "next') do NOT count as completion.\n\n" "You may APPEND new checklist items if the agent's work reveals " "criteria the original decomposition missed. Stay strict — only add " "items that genuinely belong as completion criteria.\n\n" "STICKINESS: items already marked completed or impossible are frozen. " "Do not include them in your updates. Only the user can revert them.\n\n" "TOOLS: you have read_file(path, offset, limit) available. The full " "conversation history for this session is dumped to a JSON file whose " "path is given in the user message. Call read_file on it when the " "snippet is ambiguous, when you need to verify a tool call actually " "happened, or when you want to see what skills the agent loaded. " "Otherwise, judge from the snippet directly — extra reads cost tokens.\n\n" "When you are ready to rule, reply ONLY with a single JSON object:\n" '{"updates": [{"index": , "status": "completed|impossible", "evidence": ""}, ...], ' '"new_items": [{"text": ""}, ...], ' '"reason": ""}\n' "When citing evidence, reference the agent's actual output specifically. " "Empty updates is fine. Empty new_items is fine. The reason field is required." ) EVALUATE_USER_PROMPT_CHECKLIST_TEMPLATE = ( "Goal:\n{goal}\n\n" "Current checklist (each item is numbered, 1-based — use these " "exact 1-based numbers as the ``index`` field in your updates):\n{checklist_block}\n\n" "Agent's most recent response (snippet):\n{response}\n\n" "Conversation history file (call read_file on this path if you need " "more context — pagination supported via offset/limit):\n{history_path}\n\n" "Evaluate each pending item. Cite specific evidence." ) EVALUATE_USER_PROMPT_FREEFORM_TEMPLATE = ( "Goal:\n{goal}\n\n" "Agent's most recent response:\n{response}\n\n" "Is the goal satisfied?" ) # ────────────────────────────────────────────────────────────────────── # Dataclasses # ────────────────────────────────────────────────────────────────────── @dataclass class ChecklistItem: """One concrete completion criterion attached to a goal.""" text: str status: str = ITEM_PENDING # pending | completed | impossible added_by: str = ADDED_BY_JUDGE # judge | user added_at: float = 0.0 completed_at: Optional[float] = None evidence: Optional[str] = None # judge's rationale on flip def to_dict(self) -> Dict[str, Any]: return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ChecklistItem": text = str(data.get("text", "")).strip() if not text: text = "(empty item)" status = str(data.get("status", ITEM_PENDING)).strip().lower() if status not in VALID_ITEM_STATUSES: status = ITEM_PENDING added_by = str(data.get("added_by", ADDED_BY_JUDGE)).strip().lower() if added_by not in (ADDED_BY_JUDGE, ADDED_BY_USER): added_by = ADDED_BY_JUDGE return cls( text=text, status=status, added_by=added_by, added_at=float(data.get("added_at", 0.0) or 0.0), completed_at=( float(data["completed_at"]) if data.get("completed_at") is not None else None ), evidence=data.get("evidence"), ) @dataclass class GoalState: """Serializable goal state stored per session.""" goal: str status: str = "active" # active | paused | done | cleared turns_used: int = 0 max_turns: int = DEFAULT_MAX_TURNS created_at: float = 0.0 last_turn_at: float = 0.0 last_verdict: Optional[str] = None # "done" | "continue" | "skipped" last_reason: Optional[str] = None paused_reason: Optional[str] = None # why we auto-paused (budget, etc.) consecutive_parse_failures: int = 0 # judge-output parse failures in a row # Checklist mode (added 2026-05). Both fields default safely so old # state_meta rows load unchanged. checklist: List[ChecklistItem] = field(default_factory=list) decomposed: bool = False # has Phase-A run for this goal? def to_json(self) -> str: data = asdict(self) # asdict already serializes ChecklistItem via dataclass recursion. return json.dumps(data, ensure_ascii=False) @classmethod def from_json(cls, raw: str) -> "GoalState": data = json.loads(raw) raw_checklist = data.get("checklist") or [] checklist: List[ChecklistItem] = [] if isinstance(raw_checklist, list): for item in raw_checklist: if isinstance(item, dict): try: checklist.append(ChecklistItem.from_dict(item)) except Exception: continue return cls( goal=data.get("goal", ""), status=data.get("status", "active"), turns_used=int(data.get("turns_used", 0) or 0), max_turns=int(data.get("max_turns", DEFAULT_MAX_TURNS) or DEFAULT_MAX_TURNS), created_at=float(data.get("created_at", 0.0) or 0.0), last_turn_at=float(data.get("last_turn_at", 0.0) or 0.0), last_verdict=data.get("last_verdict"), last_reason=data.get("last_reason"), paused_reason=data.get("paused_reason"), consecutive_parse_failures=int(data.get("consecutive_parse_failures", 0) or 0), checklist=checklist, decomposed=bool(data.get("decomposed", False)), ) # --- checklist helpers ------------------------------------------------ def checklist_counts(self) -> Tuple[int, int, int, int]: """Return (total, completed, impossible, pending).""" total = len(self.checklist) completed = sum(1 for it in self.checklist if it.status == ITEM_COMPLETED) impossible = sum(1 for it in self.checklist if it.status == ITEM_IMPOSSIBLE) pending = total - completed - impossible return total, completed, impossible, pending def all_terminal(self) -> bool: """True iff at least one item exists and every item is in a terminal status.""" if not self.checklist: return False return all(it.status in TERMINAL_ITEM_STATUSES for it in self.checklist) def render_checklist(self, *, numbered: bool = False) -> str: if not self.checklist: return "(empty)" lines = [] for i, item in enumerate(self.checklist, start=1): marker = ITEM_MARKERS.get(item.status, "[?]") prefix = f"{i}. {marker}" if numbered else f" {marker}" line = f"{prefix} {item.text}" if item.status == ITEM_IMPOSSIBLE and item.evidence: line += f" (impossible: {item.evidence})" lines.append(line) return "\n".join(lines) # ────────────────────────────────────────────────────────────────────── # Persistence (SessionDB state_meta) # ────────────────────────────────────────────────────────────────────── def _meta_key(session_id: str) -> str: return f"goal:{session_id}" _DB_CACHE: Dict[str, Any] = {} def _get_session_db() -> Optional[Any]: """Return a SessionDB instance for the current HERMES_HOME. SessionDB has no built-in singleton, but opening a new connection per /goal call would thrash the file. We cache one instance per ``hermes_home`` path so profile switches still pick up the right DB. Defensive against import/instantiation failures so tests and non-standard launchers can still use the GoalManager. """ try: from hermes_constants import get_hermes_home from hermes_state import SessionDB home = str(get_hermes_home()) except Exception as exc: # pragma: no cover logger.debug("GoalManager: SessionDB bootstrap failed (%s)", exc) return None cached = _DB_CACHE.get(home) if cached is not None: return cached try: db = SessionDB() except Exception as exc: # pragma: no cover logger.debug("GoalManager: SessionDB() raised (%s)", exc) return None _DB_CACHE[home] = db return db def load_goal(session_id: str) -> Optional[GoalState]: """Load the goal for a session, or None if none exists.""" if not session_id: return None db = _get_session_db() if db is None: return None try: raw = db.get_meta(_meta_key(session_id)) except Exception as exc: logger.debug("GoalManager: get_meta failed: %s", exc) return None if not raw: return None try: return GoalState.from_json(raw) except Exception as exc: logger.warning("GoalManager: could not parse stored goal for %s: %s", session_id, exc) return None def save_goal(session_id: str, state: GoalState) -> None: """Persist a goal to SessionDB. No-op if DB unavailable.""" if not session_id: return db = _get_session_db() if db is None: return try: db.set_meta(_meta_key(session_id), state.to_json()) except Exception as exc: logger.debug("GoalManager: set_meta failed: %s", exc) def clear_goal(session_id: str) -> None: """Mark a goal cleared in the DB (preserved for audit, status=cleared).""" state = load_goal(session_id) if state is None: return state.status = "cleared" save_goal(session_id, state) # ────────────────────────────────────────────────────────────────────── # Conversation-history dump (read by the judge tool loop) # ────────────────────────────────────────────────────────────────────── def _goals_dump_dir() -> Optional[Path]: """Return ``/goals`` (created on first use), or None on error.""" try: from hermes_constants import get_hermes_home home = Path(get_hermes_home()) except Exception as exc: logger.debug("goals dump dir: get_hermes_home failed: %s", exc) return None try: path = home / "goals" path.mkdir(parents=True, exist_ok=True) return path except Exception as exc: logger.debug("goals dump dir: mkdir failed: %s", exc) return None def _safe_session_filename(session_id: str) -> str: """Make a session_id safe for use as a filename component.""" cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", session_id or "unknown") # Bound length to keep filesystem happy. return cleaned[:128] or "unknown" def conversation_dump_path(session_id: str) -> Optional[Path]: """Where the dumped messages JSON for ``session_id`` lives.""" base = _goals_dump_dir() if base is None: return None return base / f"{_safe_session_filename(session_id)}.json" def dump_conversation(session_id: str, messages: List[Dict[str, Any]]) -> Optional[Path]: """Write ``messages`` to the goals/ dump file. Returns the path on success.""" if not session_id or not messages: return None path = conversation_dump_path(session_id) if path is None: return None try: # Best-effort: messages may contain non-JSON-serializable objects from # provider-specific adapter shims. Fall through with default=str. with open(path, "w", encoding="utf-8") as fh: json.dump(messages, fh, ensure_ascii=False, indent=2, default=str) return path except Exception as exc: logger.debug("dump_conversation: write failed: %s", exc) return None # ────────────────────────────────────────────────────────────────────── # Judge: parsing helpers # ────────────────────────────────────────────────────────────────────── def _truncate(text: str, limit: int) -> str: if not text: return "" if len(text) <= limit: return text return text[:limit] + "… [truncated]" _JSON_OBJECT_RE = re.compile(r"\{.*\}", re.DOTALL) def _extract_json_object(raw: str) -> Optional[Dict[str, Any]]: """Best-effort extraction of a single JSON object from a possibly-prosey reply.""" if not raw: return None text = raw.strip() if text.startswith("```"): text = text.strip("`") nl = text.find("\n") if nl != -1: text = text[nl + 1:] try: data = json.loads(text) except Exception: match = _JSON_OBJECT_RE.search(text) if not match: return None try: data = json.loads(match.group(0)) except Exception: return None return data if isinstance(data, dict) else None def _parse_judge_response(raw: str) -> Tuple[bool, str, bool]: """Parse the freeform judge's reply. Fail-open to ``(False, "", parse_failed)``. Returns ``(done, reason, parse_failed)``. ``parse_failed`` is True when the judge returned output that couldn't be interpreted as the expected JSON verdict (empty body, prose, malformed JSON). Callers use that flag to auto-pause after N consecutive parse failures so a weak judge model doesn't silently burn the turn budget. """ if not raw: return False, "judge returned empty response", True data = _extract_json_object(raw) if data is None: return False, f"judge reply was not JSON: {_truncate(raw, 200)!r}", True done_val = data.get("done") if isinstance(done_val, str): done = done_val.strip().lower() in ("true", "yes", "1", "done") else: done = bool(done_val) reason = str(data.get("reason") or "").strip() if not reason: reason = "no reason provided" return done, reason, False def _parse_decompose_response(raw: str) -> Tuple[List[Dict[str, Any]], bool]: """Parse a Phase-A decompose reply. Returns (items, parse_failed).""" if not raw: return [], True data = _extract_json_object(raw) if data is None: return [], True raw_items = data.get("checklist") if not isinstance(raw_items, list): return [], True out: List[Dict[str, Any]] = [] for item in raw_items: if isinstance(item, dict): text = str(item.get("text", "")).strip() if text: out.append({"text": text}) elif isinstance(item, str): text = item.strip() if text: out.append({"text": text}) return out, False def _parse_evaluate_response(raw: str) -> Tuple[Dict[str, Any], bool]: """Parse a Phase-B checklist eval reply. Returns (parsed, parse_failed). parsed = {"updates": [...], "new_items": [...], "reason": str} """ if not raw: return {"updates": [], "new_items": [], "reason": "judge returned empty response"}, True data = _extract_json_object(raw) if data is None: return ( { "updates": [], "new_items": [], "reason": f"judge reply was not JSON: {_truncate(raw, 200)!r}", }, True, ) updates = data.get("updates") or [] new_items = data.get("new_items") or [] reason = str(data.get("reason") or "").strip() or "no reason provided" norm_updates = [] if isinstance(updates, list): for upd in updates: if not isinstance(upd, dict): continue try: # Judge sees the checklist rendered with 1-based indices # (matches the /subgoal CLI). Convert to 0-based here so the # apply layer can index ``state.checklist`` directly. idx_1based = int(upd.get("index")) except (TypeError, ValueError): continue idx = idx_1based - 1 status = str(upd.get("status", "")).strip().lower() if status not in TERMINAL_ITEM_STATUSES: # Phase-B only accepts terminal flips. Pending → pending is a no-op. continue evidence = str(upd.get("evidence") or "").strip() or None norm_updates.append({"index": idx, "status": status, "evidence": evidence}) norm_new = [] if isinstance(new_items, list): for it in new_items: if isinstance(it, dict): text = str(it.get("text", "")).strip() if text: norm_new.append({"text": text}) elif isinstance(it, str): text = it.strip() if text: norm_new.append({"text": text}) return {"updates": norm_updates, "new_items": norm_new, "reason": reason}, False # ────────────────────────────────────────────────────────────────────── # Judge: read_file tool for the judge's bounded tool loop # ────────────────────────────────────────────────────────────────────── _JUDGE_READ_FILE_TOOL_SCHEMA: Dict[str, Any] = { "type": "function", "function": { "name": "read_file", "description": ( "Read a portion of the dumped conversation history JSON file. " "Use this when the snippet alone isn't enough to rule. Returns " "lines from the file with 1-based line numbers. Pagination " "supported via offset and limit. Reads beyond a built-in cap " "are truncated." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": ( "Absolute path to the conversation history file. " "You were given this in the user message." ), }, "offset": { "type": "integer", "description": "1-indexed starting line number (default 1).", "default": 1, }, "limit": { "type": "integer", "description": ( f"Max lines to return (default {_JUDGE_READ_FILE_MAX_LINES})." ), "default": _JUDGE_READ_FILE_MAX_LINES, }, }, "required": ["path"], }, }, } def _judge_read_file( path: str, *, offset: int = 1, limit: int = _JUDGE_READ_FILE_MAX_LINES, allowed_path: Optional[Path] = None, ) -> str: """Bounded read of the dumped conversation file. Returns JSON-serializable text. Restricted to ``allowed_path`` when provided — the judge cannot use this tool to read arbitrary files. """ if not path: return json.dumps({"error": "path is required"}) try: target = Path(path).resolve() except Exception as exc: return json.dumps({"error": f"path resolve failed: {exc}"}) if allowed_path is not None: try: allowed = allowed_path.resolve() except Exception: allowed = allowed_path if target != allowed: return json.dumps({ "error": ( f"read_file is restricted to the conversation dump path. " f"Allowed: {allowed}" ) }) if not target.exists(): return json.dumps({"error": f"file not found: {target}"}) try: offset = max(1, int(offset or 1)) limit = max(1, min(int(limit or _JUDGE_READ_FILE_MAX_LINES), _JUDGE_READ_FILE_MAX_LINES)) except (TypeError, ValueError): return json.dumps({"error": "offset and limit must be integers"}) try: with open(target, "r", encoding="utf-8", errors="replace") as fh: lines = fh.readlines() except Exception as exc: return json.dumps({"error": f"read failed: {exc}"}) total = len(lines) start = offset - 1 end = min(start + limit, total) slice_lines = lines[start:end] out = "".join(slice_lines) if len(out) > _JUDGE_READ_FILE_MAX_CHARS: out = out[:_JUDGE_READ_FILE_MAX_CHARS] + "\n… [truncated by judge read cap]" return json.dumps({ "path": str(target), "total_lines": total, "offset": offset, "returned": len(slice_lines), "next_offset": end + 1 if end < total else None, "content": out, }, ensure_ascii=False) # ────────────────────────────────────────────────────────────────────── # Judge: phase-A (decompose) and phase-B (evaluate) # ────────────────────────────────────────────────────────────────────── def _get_judge_client() -> Tuple[Optional[Any], str]: """Return (client, model) or (None, '') when unavailable.""" try: from agent.auxiliary_client import get_text_auxiliary_client except Exception as exc: logger.debug("goal judge: auxiliary client import failed: %s", exc) return None, "" try: client, model = get_text_auxiliary_client("goal_judge") except Exception as exc: logger.debug("goal judge: get_text_auxiliary_client failed: %s", exc) return None, "" if client is None or not model: return None, "" return client, model def decompose_goal( goal: str, *, timeout: float = DEFAULT_JUDGE_TIMEOUT, ) -> Tuple[List[Dict[str, Any]], Optional[str]]: """Phase-A: ask the judge to break the goal into a checklist. Returns ``(items, error)``. On any failure, returns ``([], reason)`` so the caller can decide whether to fall back to freeform mode. """ if not goal.strip(): return [], "empty goal" client, model = _get_judge_client() if client is None: return [], "auxiliary client unavailable" try: resp = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": DECOMPOSE_SYSTEM_PROMPT}, { "role": "user", "content": DECOMPOSE_USER_PROMPT_TEMPLATE.format( goal=_truncate(goal, 4000) ), }, ], temperature=0, max_tokens=2000, timeout=timeout, ) except Exception as exc: logger.info("goal decompose: API call failed (%s)", exc) return [], f"decompose error: {type(exc).__name__}" try: raw = resp.choices[0].message.content or "" except Exception: raw = "" items, parse_failed = _parse_decompose_response(raw) if parse_failed or not items: logger.info( "goal decompose: parse failed or empty checklist (raw=%r)", _truncate(raw, 200), ) return [], "decompose parse failed or empty" logger.info("goal decompose: produced %d checklist items", len(items)) return items, None def judge_goal_freeform( goal: str, last_response: str, *, timeout: float = DEFAULT_JUDGE_TIMEOUT, ) -> Tuple[str, str, bool]: """Legacy freeform judge — kept for goals with no checklist. Returns ``(verdict, reason, parse_failed)`` where verdict is ``"done"``, ``"continue"``, or ``"skipped"``. """ if not goal.strip(): return "skipped", "empty goal", False if not last_response.strip(): return "continue", "empty response (nothing to evaluate)", False client, model = _get_judge_client() if client is None: return "continue", "auxiliary client unavailable", False prompt = EVALUATE_USER_PROMPT_FREEFORM_TEMPLATE.format( goal=_truncate(goal, 2000), response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS), ) try: resp = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": EVALUATE_SYSTEM_PROMPT_FREEFORM}, {"role": "user", "content": prompt}, ], temperature=0, max_tokens=200, timeout=timeout, ) except Exception as exc: logger.info("goal judge: API call failed (%s) — falling through to continue", exc) return "continue", f"judge error: {type(exc).__name__}", False try: raw = resp.choices[0].message.content or "" except Exception: raw = "" done, reason, parse_failed = _parse_judge_response(raw) verdict = "done" if done else "continue" logger.info("goal judge (freeform): verdict=%s reason=%s", verdict, _truncate(reason, 120)) return verdict, reason, parse_failed def evaluate_checklist( state: GoalState, last_response: str, *, history_path: Optional[Path], timeout: float = DEFAULT_JUDGE_TIMEOUT, max_tool_calls: int = DEFAULT_MAX_JUDGE_TOOL_CALLS, ) -> Tuple[Dict[str, Any], bool]: """Phase-B: judge evaluates each pending checklist item. Runs a bounded tool loop so the judge can call ``read_file`` on the dumped conversation history when the snippet isn't enough. Returns ``(parsed, parse_failed)`` where parsed is ``{"updates": [...], "new_items": [...], "reason": str}``. Falls open on transport errors: empty updates/new_items, parse_failed=False. """ client, model = _get_judge_client() if client is None: return ({"updates": [], "new_items": [], "reason": "auxiliary client unavailable"}, False) # Render checklist with 1-based indices the judge can address. checklist_block = state.render_checklist(numbered=True) user_prompt = EVALUATE_USER_PROMPT_CHECKLIST_TEMPLATE.format( goal=_truncate(state.goal, 2000), checklist_block=checklist_block, response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS), history_path=str(history_path) if history_path else "(unavailable — judge from snippet only)", ) messages: List[Dict[str, Any]] = [ {"role": "system", "content": EVALUATE_SYSTEM_PROMPT_CHECKLIST}, {"role": "user", "content": user_prompt}, ] # Some auxiliary providers may not support tool calls. We pass tools # optimistically; if the provider returns a verdict directly without # using them, we just parse it. tools = [_JUDGE_READ_FILE_TOOL_SCHEMA] if history_path is not None else None tool_calls_left = max(0, int(max_tool_calls)) final_raw = "" for _ in range(tool_calls_left + 1): try: kwargs: Dict[str, Any] = { "model": model, "messages": messages, "temperature": 0, "max_tokens": 1500, "timeout": timeout, } if tools: kwargs["tools"] = tools kwargs["tool_choice"] = "auto" resp = client.chat.completions.create(**kwargs) except Exception as exc: logger.info("goal judge (checklist): API call failed (%s)", exc) return ( { "updates": [], "new_items": [], "reason": f"judge error: {type(exc).__name__}", }, False, ) try: choice = resp.choices[0] msg = choice.message except Exception: return ( {"updates": [], "new_items": [], "reason": "judge response malformed"}, True, ) # Unpack tool_calls in a way that works for openai-py and other shims. tool_calls = getattr(msg, "tool_calls", None) or [] content = getattr(msg, "content", "") or "" if not tool_calls: final_raw = content break if tool_calls_left <= 0: # Out of budget. Force a final ruling on the next pass by # appending a system note and disabling tools. messages.append({ "role": "user", "content": ( "You have exhausted your read_file budget. Issue your " "final JSON verdict now without calling more tools." ), }) tools = None continue # Append the assistant turn, then handle each tool call. assistant_record: Dict[str, Any] = { "role": "assistant", "content": content, "tool_calls": [], } for tc in tool_calls: try: tc_id = getattr(tc, "id", None) or "tc-?" fn = getattr(tc, "function", None) fn_name = getattr(fn, "name", "") if fn is not None else "" fn_args = getattr(fn, "arguments", "") if fn is not None else "" assistant_record["tool_calls"].append({ "id": tc_id, "type": "function", "function": {"name": fn_name, "arguments": fn_args}, }) except Exception: continue messages.append(assistant_record) for tc in tool_calls: try: tc_id = getattr(tc, "id", None) or "tc-?" fn = getattr(tc, "function", None) fn_name = getattr(fn, "name", "") if fn is not None else "" fn_args_raw = getattr(fn, "arguments", "") if fn is not None else "" except Exception: continue try: args = json.loads(fn_args_raw) if isinstance(fn_args_raw, str) else (fn_args_raw or {}) except Exception: args = {} if fn_name == "read_file": tool_result = _judge_read_file( str(args.get("path", "")), offset=args.get("offset", 1), limit=args.get("limit", _JUDGE_READ_FILE_MAX_LINES), allowed_path=history_path, ) else: tool_result = json.dumps({"error": f"unknown tool: {fn_name}"}) messages.append({ "role": "tool", "tool_call_id": tc_id, "name": fn_name, "content": tool_result, }) tool_calls_left -= 1 if tool_calls_left <= 0: messages.append({ "role": "user", "content": ( "You have exhausted your read_file budget. Issue your " "final JSON verdict now without calling more tools." ), }) tools = None parsed, parse_failed = _parse_evaluate_response(final_raw) logger.info( "goal judge (checklist): updates=%d new_items=%d reason=%s", len(parsed.get("updates") or []), len(parsed.get("new_items") or []), _truncate(parsed.get("reason", ""), 120), ) return parsed, parse_failed # ────────────────────────────────────────────────────────────────────── # GoalManager — the orchestration surface CLI + gateway talk to # ────────────────────────────────────────────────────────────────────── class GoalManager: """Per-session goal state + continuation decisions. The CLI and gateway each hold one ``GoalManager`` per live session. Methods: - ``set(goal)`` — start a new standing goal. - ``clear()`` — remove the active goal. - ``pause()`` / ``resume()`` — explicit user controls. - ``status()`` — printable one-liner. - ``add_subgoal(text)`` — user appends a checklist item. - ``mark_subgoal(index, status)`` — user flips an item (override). - ``remove_subgoal(index)`` — user deletes an item. - ``clear_checklist()`` — user wipes the checklist; next turn re-decomposes. - ``evaluate_after_turn(last_response, agent=None)`` — call the judge, update state, return a decision dict. - ``next_continuation_prompt()`` — the canonical user-role message to feed back into ``run_conversation``. """ def __init__(self, session_id: str, *, default_max_turns: int = DEFAULT_MAX_TURNS): self.session_id = session_id self.default_max_turns = int(default_max_turns or DEFAULT_MAX_TURNS) self._state: Optional[GoalState] = load_goal(session_id) # --- introspection ------------------------------------------------ @property def state(self) -> Optional[GoalState]: return self._state def is_active(self) -> bool: return self._state is not None and self._state.status == "active" def has_goal(self) -> bool: return self._state is not None and self._state.status in ("active", "paused") def status_line(self) -> str: s = self._state if s is None or s.status in ("cleared",): return "No active goal. Set one with /goal ." turns = f"{s.turns_used}/{s.max_turns} turns" cl_total, cl_done, cl_imp, _ = s.checklist_counts() cl_text = "" if cl_total: cl_text = f", {cl_done + cl_imp}/{cl_total} done" if s.status == "active": return f"⊙ Goal (active, {turns}{cl_text}): {s.goal}" if s.status == "paused": extra = f" — {s.paused_reason}" if s.paused_reason else "" return f"⏸ Goal (paused, {turns}{cl_text}{extra}): {s.goal}" if s.status == "done": return f"✓ Goal done ({turns}{cl_text}): {s.goal}" return f"Goal ({s.status}, {turns}{cl_text}): {s.goal}" def render_checklist(self) -> str: """Public helper for the /subgoal slash command.""" if self._state is None: return "(no active goal)" if not self._state.checklist: return "(checklist empty — judge will populate it on the next turn)" return self._state.render_checklist(numbered=True) # --- mutation ----------------------------------------------------- def set(self, goal: str, *, max_turns: Optional[int] = None) -> GoalState: goal = (goal or "").strip() if not goal: raise ValueError("goal text is empty") state = GoalState( goal=goal, status="active", turns_used=0, max_turns=int(max_turns) if max_turns else self.default_max_turns, created_at=time.time(), last_turn_at=0.0, checklist=[], decomposed=False, ) self._state = state save_goal(self.session_id, state) return state def pause(self, reason: str = "user-paused") -> Optional[GoalState]: if not self._state: return None self._state.status = "paused" self._state.paused_reason = reason save_goal(self.session_id, self._state) return self._state def resume(self, *, reset_budget: bool = True) -> Optional[GoalState]: if not self._state: return None self._state.status = "active" self._state.paused_reason = None if reset_budget: self._state.turns_used = 0 save_goal(self.session_id, self._state) return self._state def clear(self) -> None: if self._state is None: return self._state.status = "cleared" save_goal(self.session_id, self._state) self._state = None def mark_done(self, reason: str) -> None: if not self._state: return self._state.status = "done" self._state.last_verdict = "done" self._state.last_reason = reason save_goal(self.session_id, self._state) # --- /subgoal user controls --------------------------------------- def add_subgoal(self, text: str) -> ChecklistItem: """User appends a new checklist item. Requires an active or paused goal.""" if self._state is None: raise RuntimeError("no active goal") text = (text or "").strip() if not text: raise ValueError("subgoal text is empty") item = ChecklistItem( text=text, status=ITEM_PENDING, added_by=ADDED_BY_USER, added_at=time.time(), ) self._state.checklist.append(item) save_goal(self.session_id, self._state) return item def mark_subgoal(self, index_1based: int, status: str) -> ChecklistItem: """User overrides an item's status. ``status`` may be ``completed``, ``impossible``, or ``pending`` (the last only as an undo flow). Stickiness rules do NOT apply to user actions — the user is the only authority that can revert terminal items. """ if self._state is None: raise RuntimeError("no active goal") status = (status or "").strip().lower() if status not in VALID_ITEM_STATUSES: raise ValueError( f"status must be one of {sorted(VALID_ITEM_STATUSES)}; got {status!r}" ) idx = int(index_1based) - 1 if idx < 0 or idx >= len(self._state.checklist): raise IndexError( f"index out of range (1..{len(self._state.checklist)})" ) item = self._state.checklist[idx] item.status = status if status in TERMINAL_ITEM_STATUSES: item.completed_at = time.time() if not item.evidence: item.evidence = "marked by user" else: item.completed_at = None # Don't wipe judge-supplied evidence on undo — useful audit trail. save_goal(self.session_id, self._state) return item def remove_subgoal(self, index_1based: int) -> ChecklistItem: if self._state is None: raise RuntimeError("no active goal") idx = int(index_1based) - 1 if idx < 0 or idx >= len(self._state.checklist): raise IndexError( f"index out of range (1..{len(self._state.checklist)})" ) removed = self._state.checklist.pop(idx) save_goal(self.session_id, self._state) return removed def clear_checklist(self) -> None: """Wipe the checklist and reset decomposed=False so the judge re-decomposes.""" if self._state is None: return self._state.checklist = [] self._state.decomposed = False save_goal(self.session_id, self._state) # --- the main entry point called after every turn ----------------- def evaluate_after_turn( self, last_response: str, *, user_initiated: bool = True, agent: Any = None, messages: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Run the judge and update state. Return a decision dict. ``user_initiated`` distinguishes a real user prompt (True) from a continuation prompt we fed ourselves (False). Both increment ``turns_used`` because both consume model budget. ``messages`` is the agent's full conversation list for this session. When provided, it's dumped to ``/goals/.json`` so the Phase-B judge's read_file tool can inspect history. Optional — when missing, the judge runs from the snippet only. ``agent`` is a back-compat path — when ``messages`` is None we try to extract them from common AIAgent attribute names. Most callers should pass ``messages`` directly because AIAgent does not store the message list as a public instance attribute. Decision keys: - ``status``: current goal status after update - ``should_continue``: bool — caller should fire another turn - ``continuation_prompt``: str or None - ``verdict``: "done" | "continue" | "skipped" | "inactive" | "decompose" - ``reason``: str - ``message``: user-visible one-liner to print/send """ state = self._state if state is None or state.status != "active": return { "status": state.status if state else None, "should_continue": False, "continuation_prompt": None, "verdict": "inactive", "reason": "no active goal", "message": "", } # Count the turn that just finished. state.turns_used += 1 state.last_turn_at = time.time() # ── Phase A: decompose (first call after /goal set) ─────────── if not state.decomposed: items, err = decompose_goal(state.goal) state.decomposed = True decompose_message = "" if items: now = time.time() for entry in items: state.checklist.append( ChecklistItem( text=entry["text"], status=ITEM_PENDING, added_by=ADDED_BY_JUDGE, added_at=now, ) ) state.last_verdict = "decompose" state.last_reason = f"decomposed into {len(items)} items" decompose_message = ( f"⊙ Goal checklist created ({len(items)} items). " f"Use /subgoal to view or edit it." ) save_goal(self.session_id, state) return { "status": "active", "should_continue": True, "continuation_prompt": self.next_continuation_prompt(), "verdict": "decompose", "reason": state.last_reason, "message": decompose_message, } # Decompose failed — fall through to freeform mode below. logger.info("goal: decompose failed (%s) — falling back to freeform judge", err) state.last_reason = f"decompose failed: {err}" # ── Phase B: evaluate ──────────────────────────────────────── verdict, reason, parse_failed = self._evaluate_state_phase_b( state, last_response, agent=agent, messages=messages ) state.last_verdict = verdict state.last_reason = reason # Track consecutive judge parse failures. Reset on any usable reply, # including API / transport errors (parse_failed=False) so a flaky # network doesn't trip the auto-pause meant for bad judge models. if parse_failed: state.consecutive_parse_failures += 1 else: state.consecutive_parse_failures = 0 if verdict == "done": state.status = "done" save_goal(self.session_id, state) return { "status": "done", "should_continue": False, "continuation_prompt": None, "verdict": "done", "reason": reason, "message": f"✓ Goal achieved: {reason}", } # Auto-pause when the judge model can't produce the expected JSON # verdict N turns in a row. if state.consecutive_parse_failures >= DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES: state.status = "paused" state.paused_reason = ( f"judge model returned unparseable output {state.consecutive_parse_failures} turns in a row" ) save_goal(self.session_id, state) return { "status": "paused", "should_continue": False, "continuation_prompt": None, "verdict": "continue", "reason": reason, "message": ( f"⏸ Goal paused — the judge model ({state.consecutive_parse_failures} turns) " "isn't returning the required JSON verdict. Route the judge to a stricter " "model in ~/.hermes/config.yaml:\n" " auxiliary:\n" " goal_judge:\n" " provider: openrouter\n" " model: google/gemini-3-flash-preview\n" "Then /goal resume to continue." ), } if state.turns_used >= state.max_turns: state.status = "paused" state.paused_reason = f"turn budget exhausted ({state.turns_used}/{state.max_turns})" save_goal(self.session_id, state) return { "status": "paused", "should_continue": False, "continuation_prompt": None, "verdict": "continue", "reason": reason, "message": ( f"⏸ Goal paused — {state.turns_used}/{state.max_turns} turns used. " "Use /goal resume to keep going, or /goal clear to stop." ), } save_goal(self.session_id, state) cl_total, cl_done, cl_imp, _ = state.checklist_counts() progress = "" if cl_total: progress = f" — {cl_done + cl_imp}/{cl_total} done" return { "status": "active", "should_continue": True, "continuation_prompt": self.next_continuation_prompt(), "verdict": "continue", "reason": reason, "message": ( f"↻ Continuing toward goal ({state.turns_used}/{state.max_turns}{progress}): {reason}" ), } def _evaluate_state_phase_b( self, state: GoalState, last_response: str, *, agent: Any = None, messages: Optional[List[Dict[str, Any]]] = None, ) -> Tuple[str, str, bool]: """Run the right kind of Phase-B evaluation given current state. With a non-empty checklist: harsh per-item evaluation with a bounded read_file tool loop. With an empty checklist (e.g. decompose failed twice): fall back to the legacy freeform judge so the goal still has a way to terminate. """ if not last_response.strip(): return "continue", "empty response (nothing to evaluate)", False if state.checklist: # Dump conversation history if we have one. Prefer explicit # ``messages`` arg (most reliable); fall back to extracting from # the agent instance for back-compat. history_path: Optional[Path] = None msgs: List[Dict[str, Any]] = [] if messages: msgs = list(messages) elif agent is not None: msgs = self._extract_agent_messages(agent) if msgs: history_path = dump_conversation(self.session_id, msgs) if history_path is None: logger.debug( "goal: conversation dump failed for session %s", self.session_id, ) else: logger.debug( "goal: no messages available for session %s — judge will run from snippet only", self.session_id, ) parsed, parse_failed = evaluate_checklist( state, last_response, history_path=history_path ) self._apply_checklist_updates(state, parsed) if state.all_terminal(): return "done", parsed.get("reason") or "all checklist items terminal", parse_failed return "continue", parsed.get("reason") or "checklist progress", parse_failed # No checklist — freeform fallback. verdict, reason, parse_failed = judge_goal_freeform(state.goal, last_response) return verdict, reason, parse_failed # --- internal helpers --------------------------------------------- @staticmethod def _extract_agent_messages(agent: Any) -> List[Dict[str, Any]]: """Best-effort extraction of the agent's conversation history. Tries common attribute names so we don't tightly couple to AIAgent. Returns an empty list when nothing is available. """ for attr in ("messages", "conversation_history", "_messages", "history"): try: msgs = getattr(agent, attr, None) if isinstance(msgs, list) and msgs: return msgs except Exception: continue return [] @staticmethod def _apply_checklist_updates(state: GoalState, parsed: Dict[str, Any]) -> None: """Apply judge updates with stickiness: never regress terminal items.""" now = time.time() for upd in parsed.get("updates") or []: try: idx = int(upd["index"]) except (KeyError, TypeError, ValueError): continue if idx < 0 or idx >= len(state.checklist): continue item = state.checklist[idx] if item.status in TERMINAL_ITEM_STATUSES: # Stickiness: judge cannot regress a terminal item. continue new_status = upd.get("status") if new_status not in TERMINAL_ITEM_STATUSES: continue item.status = new_status item.completed_at = now evidence = upd.get("evidence") if evidence: item.evidence = evidence for new_item in parsed.get("new_items") or []: text = (new_item.get("text") or "").strip() if not text: continue state.checklist.append( ChecklistItem( text=text, status=ITEM_PENDING, added_by=ADDED_BY_JUDGE, added_at=now, ) ) # --- continuation prompt ------------------------------------------ def next_continuation_prompt(self) -> Optional[str]: if not self._state or self._state.status != "active": return None if not self._state.checklist: return CONTINUATION_PROMPT_TEMPLATE.format(goal=self._state.goal) cl_total, cl_done, cl_imp, _ = self._state.checklist_counts() return CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE.format( goal=self._state.goal, done=cl_done + cl_imp, total=cl_total, checklist=self._state.render_checklist(numbered=False), ) # Public name kept for back-compat with the previous freeform-only API. def judge_goal( goal: str, last_response: str, *, timeout: float = DEFAULT_JUDGE_TIMEOUT, ) -> Tuple[str, str, bool]: """Back-compat wrapper — defers to the freeform judge.""" return judge_goal_freeform(goal, last_response, timeout=timeout) __all__ = [ "ChecklistItem", "GoalState", "GoalManager", "CONTINUATION_PROMPT_TEMPLATE", "CONTINUATION_PROMPT_WITH_CHECKLIST_TEMPLATE", "DEFAULT_MAX_TURNS", "DEFAULT_MAX_JUDGE_TOOL_CALLS", "ITEM_PENDING", "ITEM_COMPLETED", "ITEM_IMPOSSIBLE", "ITEM_MARKERS", "TERMINAL_ITEM_STATUSES", "VALID_ITEM_STATUSES", "load_goal", "save_goal", "clear_goal", "judge_goal", "judge_goal_freeform", "decompose_goal", "evaluate_checklist", "conversation_dump_path", "dump_conversation", ]