mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-24 10:52:21 +00:00
* feat(goals): add /goal wait <pid> barrier to park the loop on a background process
The /goal loop re-pokes the agent every turn via the post-turn judge. When a
goal is gated on a long-running background process (CI poller, build, test
matrix, deploy) that produces nothing to judge yet, this spins the agent into
'is it done?' busy-work and burns the turn budget.
/goal wait <pid> [reason] parks the loop: while the PID is alive, the judge is
skipped, no turn is consumed, no continuation fires, and /goal status shows a
parked indicator. The barrier auto-clears the moment the process exits (the
agent's notify_on_complete watcher is the natural wake signal), then the next
turn resumes normal judging. /goal unwait clears it manually; pause/resume/clear
drop it; a dead/stale PID can never wedge the loop.
Wired across CLI, gateway, and the mid-run command guard for parity. Barrier
persists in SessionDB.state_meta (survives /resume); GoalState gains
backward-compatible waiting_on_pid/waiting_reason/waiting_since fields. 12 new
tests; docs updated.
* fix(goals): use gateway.status._pid_exists for liveness, not os.kill(pid,0)
The Windows-footguns CI guard flagged os.kill(pid, 0) in _pid_alive — on
Windows that's not a no-op, it routes to CTRL_C_EVENT and hard-kills the
target's console process group (bpo-14484). Delegate to the canonical
footgun-safe gateway.status._pid_exists (psutil + ctypes/POSIX fallback)
instead, with a direct-psutil last resort.
* feat(goals): judge-driven auto-wait — the loop parks itself, no manual /goal wait
Makes the wait barrier automatic. Every turn the judge is shown the agent's
live background processes (pid, command, uptime, output tail from the
process_registry) alongside the goal + response, and can return a new 'wait'
verdict instead of continue:
{"verdict":"wait","wait_on_pid":N} → park until that process exits
{"verdict":"wait","wait_for_seconds":N} → park until the deadline passes
evaluate_after_turn acts on the directive (sets the barrier, parks the loop)
so the agent isn't re-poked into busy-work while CI/builds/deploys run. Adds a
time-based waiting_until barrier alongside the pid barrier; both auto-clear and
can never wedge the loop. Drivers (CLI, gateway, tui_gateway) feed the live
registry in via gather_background_processes(). Manual /goal wait stays as an
override. Judge verdict contract widened to (verdict, reason, parse_failed,
wait_directive); legacy {"done":bool} shape still accepted.
* test(goals): update kanban _fake_judge to the 4-tuple judge contract
CI test(3) caught it: test_kanban_goal_mode's _fake_judge still returned the
3-tuple (verdict, reason, parse_failed), but the kanban loop now unpacks the
4-tuple (+ wait_directive). Update the fake to return None for the directive
and accept the background_processes kwarg.
* feat(goals): trigger-based wait — park on a process's own signal, not just exit
Addresses two gaps in the judge-driven wait: (1) the judge could only express
'wait until PID exits' or 'wait N seconds', so a long-lived watcher/server that
fires a trigger MID-RUN (and may never exit) couldn't be waited on; (2) the
process's own watch_patterns/notify_on_complete trigger was invisible to the judge.
Adds a session-based barrier (waiting_on_session) that releases on the process's
OWN trigger via process_registry.is_session_waiting(): the session exits, OR (if
started with watch_patterns) its pattern matches — even while the process keeps
running. list_sessions() now surfaces session_id + watch_patterns/watch_hit/
notify_on_complete so the judge sees the trigger and is told to prefer
wait_on_session for trigger processes. Judge verdict gains a {wait_on_session}
directive (preferred over pid). Backward-compatible GoalState field; pid + time
barriers unchanged.
Tests: TestSessionTriggerBarrier (release on mid-run pattern match while alive,
release on exit, unknown-session, full park→trigger→resume, parse, validation,
backcompat load). 105 goal-surface + 85 process_registry tests green.
1385 lines
60 KiB
Python
1385 lines
60 KiB
Python
"""Persistent session goals — the Ralph loop for Hermes.
|
|
|
|
A goal is a free-form user objective that stays active across turns. After
|
|
each turn completes, a small judge call asks an auxiliary model "is this
|
|
goal satisfied by the assistant's last response?". If not, Hermes feeds a
|
|
continuation prompt back into the same session and keeps working until the
|
|
goal is done, turn budget is exhausted, the user pauses/clears it, or the
|
|
user sends a new message (which takes priority and pauses the goal loop).
|
|
|
|
State is persisted in SessionDB's ``state_meta`` table keyed by
|
|
``goal:<session_id>`` so ``/resume`` picks it up.
|
|
|
|
Design notes / invariants:
|
|
|
|
- The continuation prompt is just a normal user message appended to the
|
|
session via ``run_conversation``. No system-prompt mutation, no toolset
|
|
swap — prompt caching stays intact.
|
|
- Judge failures are fail-OPEN: ``continue``. A broken judge must not wedge
|
|
progress; the turn budget is the backstop.
|
|
- When a real user message arrives mid-loop it preempts the continuation
|
|
prompt and also pauses the goal loop for that turn (we still re-judge
|
|
after, so if the user's message happens to complete the goal the judge
|
|
will say ``done``).
|
|
- This module has zero hard dependency on ``cli.HermesCLI`` or the gateway
|
|
runner — both wire the same ``GoalManager`` in.
|
|
|
|
Nothing in this module touches the agent's system prompt or toolset.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Constants & defaults
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
DEFAULT_MAX_TURNS = 20
|
|
DEFAULT_JUDGE_TIMEOUT = 30.0
|
|
# Judge output budget. The freeform judge returns a one-line JSON verdict, but
|
|
# reasoning models (deepseek-v4, qwq, etc.) burn tokens on hidden reasoning
|
|
# before emitting the visible JSON — and the first /goal turn's prompt is
|
|
# larger than later turns, which pushes total reply length past tight caps.
|
|
# 200 tokens (the original default) reliably truncated the JSON on reasoning
|
|
# models, leaving '{"done": true, "reason": "The agent successfully' and
|
|
# triggering the auto-pause. 4096 covers reasoning + verdict on every model
|
|
# we've live-tested; override via auxiliary.goal_judge.max_tokens for
|
|
# specifically constrained setups.
|
|
DEFAULT_JUDGE_MAX_TOKENS = 4096
|
|
# Cap how much of the last response + recent messages we send to the judge.
|
|
_JUDGE_RESPONSE_SNIPPET_CHARS = 4000
|
|
# After this many consecutive judge *parse* failures (empty output / non-JSON),
|
|
# the loop auto-pauses and points the user at the goal_judge config. API /
|
|
# transport errors do NOT count toward this — those are transient. This guards
|
|
# against small models (e.g. deepseek-v4-flash) that cannot follow the strict
|
|
# JSON reply contract; without it the loop runs until the turn budget is
|
|
# exhausted with every reply shaped like `judge returned empty response` or
|
|
# `judge reply was not JSON`.
|
|
DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES = 3
|
|
|
|
|
|
CONTINUATION_PROMPT_TEMPLATE = (
|
|
"[Continuing toward your standing goal]\n"
|
|
"Goal: {goal}\n\n"
|
|
"Continue working toward this goal. Take the next concrete step. "
|
|
"If you believe the goal is complete, state so explicitly and stop. "
|
|
"If you are blocked and need input from the user, say so clearly and stop."
|
|
)
|
|
|
|
# Used when the user has added one or more /subgoal criteria. Surfaced
|
|
# to the agent verbatim so it sees what to target on the next turn,
|
|
# and surfaced to the judge so the verdict considers them too.
|
|
CONTINUATION_PROMPT_WITH_SUBGOALS_TEMPLATE = (
|
|
"[Continuing toward your standing goal]\n"
|
|
"Goal: {goal}\n\n"
|
|
"Additional criteria the user added mid-loop:\n"
|
|
"{subgoals_block}\n\n"
|
|
"Continue working toward the goal AND all additional criteria. Take "
|
|
"the next concrete step. If you believe the goal and every "
|
|
"additional criterion are complete, state so explicitly and stop. "
|
|
"If you are blocked and need input from the user, say so clearly "
|
|
"and stop."
|
|
)
|
|
|
|
|
|
JUDGE_SYSTEM_PROMPT = (
|
|
"You are a strict judge evaluating whether an autonomous agent has "
|
|
"achieved a user's stated goal. You receive the goal text, the agent's "
|
|
"most recent response, and — when present — a list of background "
|
|
"processes the agent has running. Decide one of three verdicts.\n\n"
|
|
"DONE — the goal is fully satisfied:\n"
|
|
"- The response explicitly confirms the goal was completed, OR\n"
|
|
"- The response clearly shows the final deliverable was produced, OR\n"
|
|
"- The response explains the goal is unachievable / blocked / needs "
|
|
"user input (treat this as DONE with reason describing the block).\n\n"
|
|
"WAIT — the goal is NOT done, but the next step is to wait for async "
|
|
"work to finish rather than act again. Choose this ONLY when the agent's "
|
|
"progress is genuinely gated on something running on its own:\n"
|
|
"- A background process listed below is still running AND the response "
|
|
"shows the agent is waiting on its result (e.g. a CI poller, build, "
|
|
"test run, deploy). If the process has a session id, return it in "
|
|
"``wait_on_session`` — that releases when the process exits OR its "
|
|
"watch_patterns trigger fires (use this for a long-lived watcher that "
|
|
"signals mid-run and may never exit). Otherwise return its pid in "
|
|
"``wait_on_pid`` (releases on exit only).\n"
|
|
"- The agent says it is rate-limited / backing off / must wait a fixed "
|
|
"period — return seconds in ``wait_for_seconds``.\n"
|
|
"Picking WAIT parks the loop without burning a turn; it resumes "
|
|
"automatically when the pid exits or the time elapses. Do NOT pick WAIT "
|
|
"just because work remains — only when re-poking now would be pure "
|
|
"busy-work because the agent can't progress until the async thing "
|
|
"finishes.\n\n"
|
|
"CONTINUE — not done, and there is a concrete next step the agent can "
|
|
"take right now. This is the default when in doubt.\n\n"
|
|
"Reply ONLY with a single JSON object on one line. Shapes:\n"
|
|
'{"verdict": "done", "reason": "<one sentence>"}\n'
|
|
'{"verdict": "continue", "reason": "<one sentence>"}\n'
|
|
'{"verdict": "wait", "wait_on_session": "<id>", "reason": "<one sentence>"}\n'
|
|
'{"verdict": "wait", "wait_on_pid": <int>, "reason": "<one sentence>"}\n'
|
|
'{"verdict": "wait", "wait_for_seconds": <int>, "reason": "<one sentence>"}\n'
|
|
"The legacy shape {\"done\": <true|false>, \"reason\": \"...\"} is still "
|
|
"accepted (true=done, false=continue)."
|
|
)
|
|
|
|
|
|
# Rendered into the judge prompt when the agent has background processes
|
|
# running. Gives the judge the context it needs to decide WAIT vs CONTINUE
|
|
# (and which pid to wait on) without it having to probe anything itself.
|
|
JUDGE_BACKGROUND_BLOCK_TEMPLATE = (
|
|
"Background processes the agent currently has running (it may be waiting "
|
|
"on one of these):\n{background_lines}\n\n"
|
|
)
|
|
|
|
|
|
JUDGE_USER_PROMPT_TEMPLATE = (
|
|
"Goal:\n{goal}\n\n"
|
|
"Agent's most recent response:\n{response}\n\n"
|
|
"{background_block}"
|
|
"Current time: {current_time}\n\n"
|
|
"Is the goal satisfied — done, continue, or wait?"
|
|
)
|
|
|
|
# Used when the user has added /subgoal criteria. The judge must
|
|
# evaluate ALL of them being met, not just the original goal.
|
|
JUDGE_USER_PROMPT_WITH_SUBGOALS_TEMPLATE = (
|
|
"Goal:\n{goal}\n\n"
|
|
"Additional criteria the user added mid-loop (all must also be "
|
|
"satisfied for the goal to be DONE):\n{subgoals_block}\n\n"
|
|
"Agent's most recent response:\n{response}\n\n"
|
|
"{background_block}"
|
|
"Current time: {current_time}\n\n"
|
|
"Decision: For each numbered criterion above, find concrete "
|
|
"evidence in the agent's response that the criterion is "
|
|
"satisfied. Do not accept generic phrases like 'all requirements "
|
|
"met' or 'implying it was done' — require specific evidence (a "
|
|
"file contents excerpt, an output line, a command result). If "
|
|
"ANY criterion lacks specific evidence in the response, the goal "
|
|
"is NOT done — return CONTINUE (or WAIT if blocked on a listed "
|
|
"background process).\n\n"
|
|
"Is the goal AND every additional criterion satisfied?"
|
|
)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Dataclass
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class GoalState:
|
|
"""Serializable goal state stored per session."""
|
|
|
|
goal: str
|
|
status: str = "active" # active | paused | done | cleared
|
|
turns_used: int = 0
|
|
max_turns: int = DEFAULT_MAX_TURNS
|
|
created_at: float = 0.0
|
|
last_turn_at: float = 0.0
|
|
last_verdict: Optional[str] = None # "done" | "continue" | "skipped"
|
|
last_reason: Optional[str] = None
|
|
paused_reason: Optional[str] = None # why we auto-paused (budget, etc.)
|
|
consecutive_parse_failures: int = 0 # judge-output parse failures in a row
|
|
# User-added criteria appended mid-loop via the /subgoal command.
|
|
# When non-empty the judge prompt and continuation prompt both
|
|
# include them so the agent works toward them and the judge factors
|
|
# them into the verdict. Backwards-compatible: defaults to empty so
|
|
# old state_meta rows load unchanged.
|
|
subgoals: List[str] = field(default_factory=list)
|
|
# Wait barrier: when the agent is blocked on long-running async work
|
|
# (CI poller, build, test run, deploy, rate-limit cooldown) the goal loop
|
|
# PARKS instead of being re-poked every turn into busy-work. Two barrier
|
|
# kinds, set automatically by the judge (which now sees the live
|
|
# background-process list and can return a ``wait`` verdict) or manually
|
|
# via ``/goal wait``:
|
|
# • ``waiting_on_pid`` — park until that process exits.
|
|
# • ``waiting_on_session`` — park until that process_registry session's
|
|
# OWN trigger fires: it exits, OR (if it has watch_patterns) its
|
|
# pattern matches. Covers long-lived watchers/servers that signal
|
|
# mid-run via a trigger and may never exit. Preferred over raw pid
|
|
# when the agent set up a watch_patterns/notify_on_complete process.
|
|
# • ``waiting_until`` — park until this wall-clock epoch (time backoff).
|
|
# While ANY is active, ``evaluate_after_turn`` short-circuits to
|
|
# should_continue=False without burning a turn or calling the judge. The
|
|
# barrier auto-clears when the pid exits / the trigger fires / the deadline
|
|
# passes, then the next turn resumes normal judging. Cleared by that,
|
|
# ``/goal unwait``, pause, resume, or clear. Backwards-compatible: old
|
|
# state_meta rows load with no barrier.
|
|
waiting_on_pid: Optional[int] = None
|
|
waiting_on_session: Optional[str] = None
|
|
waiting_until: float = 0.0
|
|
waiting_reason: Optional[str] = None
|
|
waiting_since: float = 0.0
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(asdict(self), ensure_ascii=False)
|
|
|
|
@classmethod
|
|
def from_json(cls, raw: str) -> "GoalState":
|
|
data = json.loads(raw)
|
|
raw_subgoals = data.get("subgoals") or []
|
|
subgoals: List[str] = []
|
|
if isinstance(raw_subgoals, list):
|
|
subgoals = [str(s).strip() for s in raw_subgoals if str(s).strip()]
|
|
return cls(
|
|
goal=data.get("goal", ""),
|
|
status=data.get("status", "active"),
|
|
turns_used=int(data.get("turns_used", 0) or 0),
|
|
max_turns=int(data.get("max_turns", DEFAULT_MAX_TURNS) or DEFAULT_MAX_TURNS),
|
|
created_at=float(data.get("created_at", 0.0) or 0.0),
|
|
last_turn_at=float(data.get("last_turn_at", 0.0) or 0.0),
|
|
last_verdict=data.get("last_verdict"),
|
|
last_reason=data.get("last_reason"),
|
|
paused_reason=data.get("paused_reason"),
|
|
consecutive_parse_failures=int(data.get("consecutive_parse_failures", 0) or 0),
|
|
subgoals=subgoals,
|
|
waiting_on_pid=(int(data["waiting_on_pid"]) if data.get("waiting_on_pid") else None),
|
|
waiting_on_session=(str(data["waiting_on_session"]) if data.get("waiting_on_session") else None),
|
|
waiting_until=float(data.get("waiting_until", 0.0) or 0.0),
|
|
waiting_reason=data.get("waiting_reason"),
|
|
waiting_since=float(data.get("waiting_since", 0.0) or 0.0),
|
|
)
|
|
|
|
# --- subgoals helpers -------------------------------------------------
|
|
|
|
def render_subgoals_block(self) -> str:
|
|
"""Render the subgoals as a numbered ``- N. text`` block. Empty
|
|
when no subgoals exist."""
|
|
if not self.subgoals:
|
|
return ""
|
|
return "\n".join(f"- {i}. {text}" for i, text in enumerate(self.subgoals, start=1))
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Persistence (SessionDB state_meta)
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _meta_key(session_id: str) -> str:
|
|
return f"goal:{session_id}"
|
|
|
|
|
|
_DB_CACHE: Dict[str, Any] = {}
|
|
|
|
|
|
def _get_session_db() -> Optional[Any]:
|
|
"""Return a SessionDB instance for the current HERMES_HOME.
|
|
|
|
SessionDB has no built-in singleton, but opening a new connection per
|
|
/goal call would thrash the file. We cache one instance per
|
|
``hermes_home`` path so profile switches still pick up the right DB.
|
|
Defensive against import/instantiation failures so tests and
|
|
non-standard launchers can still use the GoalManager.
|
|
"""
|
|
try:
|
|
from hermes_constants import get_hermes_home
|
|
from hermes_state import SessionDB
|
|
|
|
home = str(get_hermes_home())
|
|
except Exception as exc: # pragma: no cover
|
|
logger.debug("GoalManager: SessionDB bootstrap failed (%s)", exc)
|
|
return None
|
|
|
|
cached = _DB_CACHE.get(home)
|
|
if cached is not None:
|
|
return cached
|
|
try:
|
|
db = SessionDB()
|
|
except Exception as exc: # pragma: no cover
|
|
logger.debug("GoalManager: SessionDB() raised (%s)", exc)
|
|
return None
|
|
_DB_CACHE[home] = db
|
|
return db
|
|
|
|
|
|
def load_goal(session_id: str) -> Optional[GoalState]:
|
|
"""Load the goal for a session, or None if none exists."""
|
|
if not session_id:
|
|
return None
|
|
db = _get_session_db()
|
|
if db is None:
|
|
return None
|
|
try:
|
|
raw = db.get_meta(_meta_key(session_id))
|
|
except Exception as exc:
|
|
logger.debug("GoalManager: get_meta failed: %s", exc)
|
|
return None
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return GoalState.from_json(raw)
|
|
except Exception as exc:
|
|
logger.warning("GoalManager: could not parse stored goal for %s: %s", session_id, exc)
|
|
return None
|
|
|
|
|
|
def save_goal(session_id: str, state: GoalState) -> None:
|
|
"""Persist a goal to SessionDB. No-op if DB unavailable."""
|
|
if not session_id:
|
|
return
|
|
db = _get_session_db()
|
|
if db is None:
|
|
return
|
|
try:
|
|
db.set_meta(_meta_key(session_id), state.to_json())
|
|
except Exception as exc:
|
|
logger.debug("GoalManager: set_meta failed: %s", exc)
|
|
|
|
|
|
def clear_goal(session_id: str) -> None:
|
|
"""Mark a goal cleared in the DB (preserved for audit, status=cleared)."""
|
|
state = load_goal(session_id)
|
|
if state is None:
|
|
return
|
|
state.status = "cleared"
|
|
save_goal(session_id, state)
|
|
|
|
|
|
def migrate_goal_to_session(old_session_id: str, new_session_id: str, *, reason: str = "") -> bool:
|
|
"""Carry a persistent /goal from a parent session to its continuation.
|
|
|
|
Context compression rotates ``session_id`` to a fresh child session,
|
|
but ``load_goal`` does a flat ``goal:<session_id>`` lookup with no
|
|
parent-lineage walk — so an active goal silently dies at the
|
|
compaction boundary (#33618). Copy the goal onto the new session and
|
|
archive the old row as ``cleared`` so exactly one active goal row
|
|
exists per logical conversation (avoids the "two active goals"
|
|
hazard of a pure copy).
|
|
|
|
Returns True when a goal was migrated, False when there was nothing
|
|
to migrate or the DB was unavailable. Best-effort and never raises —
|
|
a failure here must not block compression.
|
|
"""
|
|
if not old_session_id or not new_session_id or old_session_id == new_session_id:
|
|
return False
|
|
try:
|
|
state = load_goal(old_session_id)
|
|
if state is None or getattr(state, "status", None) == "cleared":
|
|
return False
|
|
# Don't clobber a goal already set on the child (e.g. a resumed
|
|
# lineage that re-established its own goal).
|
|
if load_goal(new_session_id) is not None:
|
|
return False
|
|
save_goal(new_session_id, state)
|
|
# Archive the parent's row so it isn't double-counted as active.
|
|
clear_goal(old_session_id)
|
|
logger.debug(
|
|
"GoalManager: migrated goal %s -> %s (%s)",
|
|
old_session_id, new_session_id, reason or "rotation",
|
|
)
|
|
return True
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
logger.debug("GoalManager: goal migration failed: %s", exc)
|
|
return False
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Judge
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _truncate(text: str, limit: int) -> str:
|
|
if not text:
|
|
return ""
|
|
if len(text) <= limit:
|
|
return text
|
|
return text[:limit] + "… [truncated]"
|
|
|
|
|
|
def _pid_alive(pid: int) -> bool:
|
|
"""Return True if a process with ``pid`` is currently alive.
|
|
|
|
Delegates to ``gateway.status._pid_exists`` — the canonical,
|
|
cross-platform, footgun-safe liveness check (psutil with a ctypes /
|
|
POSIX fallback). Critically this avoids ``os.kill(pid, 0)``, which on
|
|
Windows is NOT a no-op: it routes to ``CTRL_C_EVENT`` and hard-kills the
|
|
target's console process group (bpo-14484). Any error resolves to False
|
|
(treat unknown as dead) so a stale barrier never wedges the loop — the
|
|
worst case is the goal resumes one turn early, which is safe.
|
|
"""
|
|
if not pid or pid <= 0:
|
|
return False
|
|
try:
|
|
from gateway.status import _pid_exists
|
|
|
|
return bool(_pid_exists(int(pid)))
|
|
except Exception:
|
|
pass
|
|
# Last-resort fallback if gateway.status is unavailable: psutil directly.
|
|
try:
|
|
import psutil # type: ignore
|
|
|
|
return bool(psutil.pid_exists(int(pid)))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _session_waiting(session_id: str) -> bool:
|
|
"""Whether a goal parked on a process_registry session should stay parked.
|
|
|
|
Delegates to ``process_registry.is_session_waiting`` — True while the
|
|
session is running and (if it has watch_patterns) its trigger hasn't fired.
|
|
Fail-safe: any import/registry error yields False (don't wait) so a stale
|
|
barrier can never wedge the loop.
|
|
"""
|
|
if not session_id:
|
|
return False
|
|
try:
|
|
from tools.process_registry import process_registry
|
|
|
|
return bool(process_registry.is_session_waiting(session_id))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
_JSON_OBJECT_RE = re.compile(r"\{.*?\}", re.DOTALL)
|
|
|
|
|
|
def _goal_judge_max_tokens() -> int:
|
|
"""Resolve auxiliary.goal_judge.max_tokens, falling back to the default.
|
|
|
|
``load_config()`` is cached on the config file's (mtime, size), so calling
|
|
this once per judge turn is cheap. A non-positive or non-int value falls
|
|
back to the default rather than crashing the goal loop.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
|
|
cfg = load_config()
|
|
value = (
|
|
(cfg.get("auxiliary") or {})
|
|
.get("goal_judge", {})
|
|
.get("max_tokens", DEFAULT_JUDGE_MAX_TOKENS)
|
|
)
|
|
value = int(value)
|
|
if value > 0:
|
|
return value
|
|
except Exception:
|
|
pass
|
|
return DEFAULT_JUDGE_MAX_TOKENS
|
|
|
|
|
|
def _parse_judge_response(raw: str) -> Tuple[str, str, bool, Optional[Dict[str, Any]]]:
|
|
"""Parse the judge's reply. Fail-open on unusable output.
|
|
|
|
Returns ``(verdict, reason, parse_failed, wait_directive)`` where:
|
|
- ``verdict`` is ``"done"``, ``"continue"``, or ``"wait"``.
|
|
- ``parse_failed`` is True when the judge returned output that couldn't
|
|
be interpreted as the expected JSON verdict (empty body, prose,
|
|
malformed JSON). Callers use it to auto-pause after N consecutive
|
|
parse failures so a weak judge model doesn't silently burn the budget.
|
|
- ``wait_directive`` is set only for ``verdict == "wait"``: a dict with
|
|
``{"pid": int}`` or ``{"seconds": int}`` (whichever the judge supplied).
|
|
``None`` otherwise. If a wait verdict carries neither a usable pid nor
|
|
seconds, it is downgraded to ``continue`` (can't park on nothing).
|
|
|
|
Accepts both the new ``{"verdict": ...}`` shape and the legacy
|
|
``{"done": <bool>}`` shape.
|
|
"""
|
|
if not raw:
|
|
return "continue", "judge returned empty response", True, None
|
|
|
|
text = raw.strip()
|
|
|
|
# Strip markdown code fences the model may wrap JSON in.
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
# Peel off leading json/JSON/etc tag
|
|
nl = text.find("\n")
|
|
if nl != -1:
|
|
text = text[nl + 1:]
|
|
|
|
# First try: parse the whole blob.
|
|
data: Optional[Dict[str, Any]] = None
|
|
try:
|
|
data = json.loads(text)
|
|
except Exception:
|
|
# Second try: pull the first JSON object out.
|
|
match = _JSON_OBJECT_RE.search(text)
|
|
if match:
|
|
try:
|
|
data = json.loads(match.group(0))
|
|
except Exception:
|
|
data = None
|
|
|
|
if not isinstance(data, dict):
|
|
return "continue", f"judge reply was not JSON: {_truncate(raw, 200)!r}", True, None
|
|
|
|
reason = str(data.get("reason") or "").strip() or "no reason provided"
|
|
|
|
# Determine verdict — prefer the explicit "verdict" field, fall back to
|
|
# the legacy "done" boolean.
|
|
verdict_raw = data.get("verdict")
|
|
if isinstance(verdict_raw, str):
|
|
verdict = verdict_raw.strip().lower()
|
|
else:
|
|
done_val = data.get("done")
|
|
if isinstance(done_val, str):
|
|
done = done_val.strip().lower() in {"true", "yes", "1", "done"}
|
|
else:
|
|
done = bool(done_val)
|
|
verdict = "done" if done else "continue"
|
|
|
|
if verdict not in {"done", "continue", "wait"}:
|
|
verdict = "continue"
|
|
|
|
if verdict != "wait":
|
|
return verdict, reason, False, None
|
|
|
|
# Wait verdict: extract a concrete directive (pid or seconds). Accept a
|
|
# few key spellings the model might emit.
|
|
def _first_int(*keys: str) -> Optional[int]:
|
|
for k in keys:
|
|
v = data.get(k)
|
|
if v is None:
|
|
continue
|
|
try:
|
|
iv = int(v)
|
|
if iv > 0:
|
|
return iv
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return None
|
|
|
|
# Prefer a session-id directive (releases on the process's own trigger —
|
|
# exit OR watch-pattern match), then pid (exit only), then seconds.
|
|
sess = data.get("wait_on_session") or data.get("session_id") or data.get("wait_session")
|
|
if isinstance(sess, str) and sess.strip():
|
|
return "wait", reason, False, {"session_id": sess.strip()}
|
|
pid = _first_int("wait_on_pid", "pid", "wait_pid")
|
|
if pid is not None:
|
|
return "wait", reason, False, {"pid": pid}
|
|
seconds = _first_int("wait_for_seconds", "seconds", "wait_seconds")
|
|
if seconds is not None:
|
|
return "wait", reason, False, {"seconds": seconds}
|
|
# Wait with no usable target — can't park on nothing; treat as continue.
|
|
return "continue", f"{reason} (wait verdict had no target — continuing)", False, None
|
|
|
|
|
|
def _render_background_block(background_processes: Optional[List[Dict[str, Any]]]) -> str:
|
|
"""Render the live background-process list for the judge prompt.
|
|
|
|
Each entry is a ``process_registry.list_sessions()`` dict. Only RUNNING
|
|
processes are worth showing (an exited one is nothing to wait on). Returns
|
|
an empty string when there's nothing running, so the judge prompt is
|
|
byte-identical to the no-background case (no behavior change for the
|
|
common path).
|
|
"""
|
|
if not background_processes:
|
|
return ""
|
|
lines: List[str] = []
|
|
for p in background_processes:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
if p.get("status") == "exited":
|
|
continue
|
|
pid = p.get("pid")
|
|
if not pid:
|
|
continue
|
|
cmd = _truncate(str(p.get("command") or "").replace("\n", " ").strip(), 120)
|
|
uptime = p.get("uptime_seconds")
|
|
tail = _truncate(str(p.get("output_preview") or "").replace("\n", " ").strip(), 120)
|
|
sid = p.get("session_id")
|
|
line = f"- pid {pid}"
|
|
if sid:
|
|
line += f" / session {sid}"
|
|
line += f": {cmd}"
|
|
if uptime is not None:
|
|
line += f" (running {uptime}s)"
|
|
# Surface the process's own trigger so the judge can wait on a
|
|
# mid-run signal (watch-pattern) or completion, not just exit.
|
|
wps = p.get("watch_patterns")
|
|
if wps:
|
|
hit = " [already matched]" if p.get("watch_hit") else ""
|
|
line += f" | watch_patterns={wps}{hit}"
|
|
elif p.get("notify_on_complete"):
|
|
line += " | notify_on_complete"
|
|
if tail:
|
|
line += f" | recent output: {tail}"
|
|
lines.append(line)
|
|
if not lines:
|
|
return ""
|
|
return JUDGE_BACKGROUND_BLOCK_TEMPLATE.format(background_lines="\n".join(lines))
|
|
|
|
|
|
def judge_goal(
|
|
goal: str,
|
|
last_response: str,
|
|
*,
|
|
timeout: float = DEFAULT_JUDGE_TIMEOUT,
|
|
subgoals: Optional[List[str]] = None,
|
|
background_processes: Optional[List[Dict[str, Any]]] = None,
|
|
) -> Tuple[str, str, bool, Optional[Dict[str, Any]]]:
|
|
"""Ask the auxiliary model whether the goal is satisfied.
|
|
|
|
Returns ``(verdict, reason, parse_failed, wait_directive)`` where verdict
|
|
is ``"done"``, ``"continue"``, ``"wait"``, or ``"skipped"`` (when the
|
|
judge couldn't be reached). ``wait_directive`` is set only for ``"wait"``
|
|
(``{"pid": int}`` or ``{"seconds": int}``); ``None`` otherwise.
|
|
|
|
``parse_failed`` is True only when the judge call succeeded but its output
|
|
was unusable (empty or non-JSON). API/transport errors return False — they
|
|
are transient and should fail-open silently. Callers use this flag to
|
|
auto-pause after N consecutive parse failures (see
|
|
``DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES``).
|
|
|
|
``subgoals`` is an optional list of user-added criteria (from
|
|
``/subgoal``) factored into the verdict. ``background_processes`` is the
|
|
live ``process_registry.list_sessions()`` snapshot; when the agent is
|
|
waiting on one (a CI poller, build, etc.) the judge can return a ``wait``
|
|
verdict naming its pid, parking the loop instead of re-poking.
|
|
|
|
This is deliberately fail-open: any error returns ``("continue", ..., False, None)``
|
|
so a broken judge doesn't wedge progress — the turn budget and the
|
|
consecutive-parse-failures auto-pause are the backstops.
|
|
"""
|
|
if not goal.strip():
|
|
return "skipped", "empty goal", False, None
|
|
if not last_response.strip():
|
|
# No substantive reply this turn — almost certainly not done yet.
|
|
return "continue", "empty response (nothing to evaluate)", False, None
|
|
|
|
try:
|
|
from agent.auxiliary_client import get_auxiliary_extra_body, get_text_auxiliary_client
|
|
except Exception as exc:
|
|
logger.debug("goal judge: auxiliary client import failed: %s", exc)
|
|
return "continue", "auxiliary client unavailable", False, None
|
|
|
|
try:
|
|
client, model = get_text_auxiliary_client("goal_judge")
|
|
except Exception as exc:
|
|
logger.debug("goal judge: get_text_auxiliary_client failed: %s", exc)
|
|
return "continue", "auxiliary client unavailable", False, None
|
|
|
|
if client is None or not model:
|
|
return "continue", "no auxiliary client configured", False, None
|
|
|
|
# Build the prompt — pick the with-subgoals variant when applicable.
|
|
clean_subgoals = [s.strip() for s in (subgoals or []) if s and s.strip()]
|
|
background_block = _render_background_block(background_processes)
|
|
current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
if clean_subgoals:
|
|
subgoals_block = "\n".join(
|
|
f"- {i}. {text}" for i, text in enumerate(clean_subgoals, start=1)
|
|
)
|
|
prompt = JUDGE_USER_PROMPT_WITH_SUBGOALS_TEMPLATE.format(
|
|
goal=_truncate(goal, 2000),
|
|
subgoals_block=_truncate(subgoals_block, 2000),
|
|
response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS),
|
|
background_block=background_block,
|
|
current_time=current_time,
|
|
)
|
|
else:
|
|
prompt = JUDGE_USER_PROMPT_TEMPLATE.format(
|
|
goal=_truncate(goal, 2000),
|
|
response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS),
|
|
background_block=background_block,
|
|
current_time=current_time,
|
|
)
|
|
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=model,
|
|
messages=[
|
|
{"role": "system", "content": JUDGE_SYSTEM_PROMPT},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
temperature=0,
|
|
max_tokens=_goal_judge_max_tokens(),
|
|
timeout=timeout,
|
|
extra_body=get_auxiliary_extra_body() or None,
|
|
)
|
|
except Exception as exc:
|
|
logger.info("goal judge: API call failed (%s) — falling through to continue", exc)
|
|
return "continue", f"judge error: {type(exc).__name__}", False, None
|
|
|
|
try:
|
|
raw = resp.choices[0].message.content or ""
|
|
except Exception:
|
|
raw = ""
|
|
|
|
verdict, reason, parse_failed, wait_directive = _parse_judge_response(raw)
|
|
logger.info(
|
|
"goal judge: verdict=%s reason=%s%s",
|
|
verdict, _truncate(reason, 120),
|
|
f" wait={wait_directive}" if wait_directive else "",
|
|
)
|
|
return verdict, reason, parse_failed, wait_directive
|
|
|
|
|
|
def gather_background_processes(task_id: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Return the live background-process snapshot for the goal judge.
|
|
|
|
Thin, fail-safe wrapper over ``process_registry.list_sessions(task_id)``.
|
|
Returns only RUNNING processes (an exited one is nothing to wait on) and
|
|
never raises — any import/registry failure yields ``[]`` so the goal loop
|
|
degrades to its pre-wait-barrier behavior (judge just won't see processes).
|
|
The drivers (CLI + gateway) call this and pass the result into
|
|
``GoalManager.evaluate_after_turn(background_processes=...)``.
|
|
"""
|
|
try:
|
|
from tools.process_registry import process_registry
|
|
|
|
sessions = process_registry.list_sessions(task_id=task_id) or []
|
|
except Exception as exc:
|
|
logger.debug("gather_background_processes failed: %s", exc)
|
|
return []
|
|
return [s for s in sessions if isinstance(s, dict) and s.get("status") != "exited"]
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# GoalManager — the orchestration surface CLI + gateway talk to
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class GoalManager:
|
|
"""Per-session goal state + continuation decisions.
|
|
|
|
The CLI and gateway each hold one ``GoalManager`` per live session.
|
|
|
|
Methods:
|
|
|
|
- ``set(goal)`` — start a new standing goal.
|
|
- ``clear()`` — remove the active goal.
|
|
- ``pause()`` / ``resume()`` — explicit user controls.
|
|
- ``status()`` — printable one-liner.
|
|
- ``evaluate_after_turn(last_response)`` — call the judge, update state,
|
|
and return a decision dict the caller uses to drive the next turn.
|
|
- ``next_continuation_prompt()`` — the canonical user-role message to
|
|
feed back into ``run_conversation``.
|
|
"""
|
|
|
|
def __init__(self, session_id: str, *, default_max_turns: int = DEFAULT_MAX_TURNS):
|
|
self.session_id = session_id
|
|
self.default_max_turns = int(default_max_turns or DEFAULT_MAX_TURNS)
|
|
self._state: Optional[GoalState] = load_goal(session_id)
|
|
|
|
# --- introspection ------------------------------------------------
|
|
|
|
@property
|
|
def state(self) -> Optional[GoalState]:
|
|
return self._state
|
|
|
|
def is_active(self) -> bool:
|
|
return self._state is not None and self._state.status == "active"
|
|
|
|
def has_goal(self) -> bool:
|
|
return self._state is not None and self._state.status in {"active", "paused"}
|
|
|
|
def status_line(self) -> str:
|
|
s = self._state
|
|
if s is None or s.status in {"cleared",}:
|
|
return "No active goal. Set one with /goal <text>."
|
|
turns = f"{s.turns_used}/{s.max_turns} turns"
|
|
sub = f", {len(s.subgoals)} subgoal{'s' if len(s.subgoals) != 1 else ''}" if s.subgoals else ""
|
|
if s.status == "active":
|
|
if s.waiting_on_session and _session_waiting(s.waiting_on_session):
|
|
wr = s.waiting_reason or f"session {s.waiting_on_session}"
|
|
return f"⏳ Goal (parked on {wr}, {turns}{sub}): {s.goal}"
|
|
if s.waiting_on_pid and _pid_alive(s.waiting_on_pid):
|
|
wr = s.waiting_reason or f"pid {s.waiting_on_pid}"
|
|
return f"⏳ Goal (parked on {wr}, {turns}{sub}): {s.goal}"
|
|
if s.waiting_until and time.time() < s.waiting_until:
|
|
remaining = int(s.waiting_until - time.time())
|
|
wr = s.waiting_reason or f"{remaining}s"
|
|
return f"⏳ Goal (parked {remaining}s — {wr}, {turns}{sub}): {s.goal}"
|
|
return f"⊙ Goal (active, {turns}{sub}): {s.goal}"
|
|
if s.status == "paused":
|
|
extra = f" — {s.paused_reason}" if s.paused_reason else ""
|
|
return f"⏸ Goal (paused, {turns}{sub}{extra}): {s.goal}"
|
|
if s.status == "done":
|
|
return f"✓ Goal done ({turns}{sub}): {s.goal}"
|
|
return f"Goal ({s.status}, {turns}{sub}): {s.goal}"
|
|
|
|
# --- mutation -----------------------------------------------------
|
|
|
|
def set(self, goal: str, *, max_turns: Optional[int] = None) -> GoalState:
|
|
goal = (goal or "").strip()
|
|
if not goal:
|
|
raise ValueError("goal text is empty")
|
|
state = GoalState(
|
|
goal=goal,
|
|
status="active",
|
|
turns_used=0,
|
|
max_turns=int(max_turns) if max_turns else self.default_max_turns,
|
|
created_at=time.time(),
|
|
last_turn_at=0.0,
|
|
)
|
|
self._state = state
|
|
save_goal(self.session_id, state)
|
|
return state
|
|
|
|
def pause(self, reason: str = "user-paused") -> Optional[GoalState]:
|
|
if not self._state:
|
|
return None
|
|
self._state.status = "paused"
|
|
self._state.paused_reason = reason
|
|
# A wait barrier is meaningless once paused — drop it.
|
|
self._state.waiting_on_pid = None
|
|
self._state.waiting_on_session = None
|
|
self._state.waiting_until = 0.0
|
|
self._state.waiting_reason = None
|
|
self._state.waiting_since = 0.0
|
|
save_goal(self.session_id, self._state)
|
|
return self._state
|
|
|
|
def resume(self, *, reset_budget: bool = True) -> Optional[GoalState]:
|
|
if not self._state:
|
|
return None
|
|
self._state.status = "active"
|
|
self._state.paused_reason = None
|
|
# Resuming starts fresh — clear any stale barrier.
|
|
self._state.waiting_on_pid = None
|
|
self._state.waiting_on_session = None
|
|
self._state.waiting_until = 0.0
|
|
self._state.waiting_reason = None
|
|
self._state.waiting_since = 0.0
|
|
if reset_budget:
|
|
self._state.turns_used = 0
|
|
save_goal(self.session_id, self._state)
|
|
return self._state
|
|
|
|
def clear(self) -> None:
|
|
if self._state is None:
|
|
return
|
|
self._state.status = "cleared"
|
|
save_goal(self.session_id, self._state)
|
|
self._state = None
|
|
|
|
def mark_done(self, reason: str) -> None:
|
|
if not self._state:
|
|
return
|
|
self._state.status = "done"
|
|
self._state.last_verdict = "done"
|
|
self._state.last_reason = reason
|
|
save_goal(self.session_id, self._state)
|
|
|
|
# --- /subgoal user controls ---------------------------------------
|
|
|
|
def add_subgoal(self, text: str) -> str:
|
|
"""Append a user-added criterion to the active goal. Requires
|
|
``has_goal()``; raises ``RuntimeError`` otherwise.
|
|
|
|
Returns the cleaned text so the caller can show it back to the user.
|
|
"""
|
|
if self._state is None or not self.has_goal():
|
|
raise RuntimeError("no active goal")
|
|
text = (text or "").strip()
|
|
if not text:
|
|
raise ValueError("subgoal text is empty")
|
|
self._state.subgoals.append(text)
|
|
save_goal(self.session_id, self._state)
|
|
return text
|
|
|
|
def remove_subgoal(self, index_1based: int) -> str:
|
|
"""Remove a subgoal by 1-based index. Returns the removed text."""
|
|
if self._state is None or not self.has_goal():
|
|
raise RuntimeError("no active goal")
|
|
idx = int(index_1based) - 1
|
|
if idx < 0 or idx >= len(self._state.subgoals):
|
|
raise IndexError(
|
|
f"index out of range (1..{len(self._state.subgoals)})"
|
|
)
|
|
removed = self._state.subgoals.pop(idx)
|
|
save_goal(self.session_id, self._state)
|
|
return removed
|
|
|
|
def clear_subgoals(self) -> int:
|
|
"""Wipe all subgoals. Returns the previous count."""
|
|
if self._state is None or not self.has_goal():
|
|
raise RuntimeError("no active goal")
|
|
prev = len(self._state.subgoals)
|
|
self._state.subgoals = []
|
|
save_goal(self.session_id, self._state)
|
|
return prev
|
|
|
|
def render_subgoals(self) -> str:
|
|
"""Public helper for the /subgoal slash command."""
|
|
if self._state is None:
|
|
return "(no active goal)"
|
|
if not self._state.subgoals:
|
|
return "(no subgoals — use /subgoal <text> to add criteria)"
|
|
return self._state.render_subgoals_block()
|
|
|
|
# --- /goal wait barrier -------------------------------------------
|
|
|
|
def wait_on(self, pid: int, reason: str = "") -> GoalState:
|
|
"""Park the goal loop on a background process PID.
|
|
|
|
While the PID is alive, ``evaluate_after_turn`` returns
|
|
``should_continue=False`` without burning a turn or calling the
|
|
judge — the loop quiesces instead of re-poking the agent into busy
|
|
work. The barrier auto-clears when the process exits. Requires an
|
|
active goal. For a process with a watch_patterns/notify_on_complete
|
|
trigger, prefer ``wait_on_session`` so a mid-run trigger (not just
|
|
exit) releases the barrier.
|
|
"""
|
|
if self._state is None or self._state.status != "active":
|
|
raise RuntimeError("no active goal to park")
|
|
pid = int(pid)
|
|
if pid <= 0:
|
|
raise ValueError("pid must be a positive integer")
|
|
self._state.waiting_on_pid = pid
|
|
self._state.waiting_on_session = None
|
|
self._state.waiting_until = 0.0
|
|
self._state.waiting_reason = (reason or "").strip() or None
|
|
self._state.waiting_since = time.time()
|
|
save_goal(self.session_id, self._state)
|
|
return self._state
|
|
|
|
def wait_on_session(self, session_id: str, reason: str = "") -> GoalState:
|
|
"""Park the goal loop on a process_registry session's OWN trigger.
|
|
|
|
Unlike ``wait_on`` (which releases only on PID exit), this releases
|
|
when the session's trigger fires: it exits, OR — if it was started
|
|
with ``watch_patterns`` — its pattern matches. This is the right
|
|
barrier for a long-lived watcher/server/poller that signals mid-run
|
|
and may never exit. Requires an active goal.
|
|
"""
|
|
if self._state is None or self._state.status != "active":
|
|
raise RuntimeError("no active goal to park")
|
|
session_id = str(session_id or "").strip()
|
|
if not session_id:
|
|
raise ValueError("session_id must be a non-empty string")
|
|
self._state.waiting_on_session = session_id
|
|
self._state.waiting_on_pid = None
|
|
self._state.waiting_until = 0.0
|
|
self._state.waiting_reason = (reason or "").strip() or None
|
|
self._state.waiting_since = time.time()
|
|
save_goal(self.session_id, self._state)
|
|
return self._state
|
|
|
|
def wait_for_seconds(self, seconds: int, reason: str = "") -> GoalState:
|
|
"""Park the goal loop until ``seconds`` from now have elapsed.
|
|
|
|
Time-based counterpart to ``wait_on`` — for backoff / cooldown waits
|
|
where there's no process to track (e.g. the agent is rate-limited).
|
|
The barrier auto-clears once the deadline passes. Requires an active
|
|
goal.
|
|
"""
|
|
if self._state is None or self._state.status != "active":
|
|
raise RuntimeError("no active goal to park")
|
|
seconds = int(seconds)
|
|
if seconds <= 0:
|
|
raise ValueError("seconds must be a positive integer")
|
|
self._state.waiting_on_pid = None
|
|
self._state.waiting_on_session = None
|
|
self._state.waiting_until = time.time() + seconds
|
|
self._state.waiting_reason = (reason or "").strip() or None
|
|
self._state.waiting_since = time.time()
|
|
save_goal(self.session_id, self._state)
|
|
return self._state
|
|
|
|
def stop_waiting(self) -> bool:
|
|
"""Clear any active wait barrier (pid / session / time). Returns True
|
|
if one was cleared."""
|
|
if self._state is None:
|
|
return False
|
|
if (
|
|
self._state.waiting_on_pid is None
|
|
and self._state.waiting_on_session is None
|
|
and not self._state.waiting_until
|
|
):
|
|
return False
|
|
self._state.waiting_on_pid = None
|
|
self._state.waiting_on_session = None
|
|
self._state.waiting_until = 0.0
|
|
self._state.waiting_reason = None
|
|
self._state.waiting_since = 0.0
|
|
save_goal(self.session_id, self._state)
|
|
return True
|
|
|
|
def is_waiting(self) -> bool:
|
|
"""True iff a barrier is set AND not yet satisfied.
|
|
|
|
Session barrier: active until the process exits or its watch-pattern
|
|
trigger fires. Pid barrier: active while the process is alive. Time
|
|
barrier: active until the deadline passes. Side effect: a satisfied
|
|
barrier is cleared here (lazy auto-clear) so the next evaluation
|
|
resumes normal judging.
|
|
"""
|
|
s = self._state
|
|
if s is None:
|
|
return False
|
|
if s.waiting_on_session is not None:
|
|
if _session_waiting(s.waiting_on_session):
|
|
return True
|
|
self.stop_waiting() # session exited or trigger fired
|
|
return False
|
|
if s.waiting_on_pid is not None:
|
|
if _pid_alive(s.waiting_on_pid):
|
|
return True
|
|
self.stop_waiting() # process gone
|
|
return False
|
|
if s.waiting_until:
|
|
if time.time() < s.waiting_until:
|
|
return True
|
|
self.stop_waiting() # deadline passed
|
|
return False
|
|
return False
|
|
|
|
# --- the main entry point called after every turn -----------------
|
|
|
|
def evaluate_after_turn(
|
|
self,
|
|
last_response: str,
|
|
*,
|
|
user_initiated: bool = True,
|
|
background_processes: Optional[List[Dict[str, Any]]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Run the judge and update state. Return a decision dict.
|
|
|
|
``user_initiated`` distinguishes a real user prompt (True) from a
|
|
continuation prompt we fed ourselves (False). Both increment
|
|
``turns_used`` because both consume model budget.
|
|
|
|
``background_processes`` is the live ``process_registry.list_sessions()``
|
|
snapshot for this session. It's handed to the judge so it can decide
|
|
to WAIT on an in-flight process (CI poller, build, ...) instead of
|
|
re-poking the agent — the automatic counterpart to ``/goal wait``.
|
|
|
|
Decision keys:
|
|
- ``status``: current goal status after update
|
|
- ``should_continue``: bool — caller should fire another turn
|
|
- ``continuation_prompt``: str or None
|
|
- ``verdict``: "done" | "continue" | "wait" | "skipped" | "inactive"
|
|
- ``reason``: str
|
|
- ``message``: user-visible one-liner to print/send
|
|
"""
|
|
state = self._state
|
|
if state is None or state.status != "active":
|
|
return {
|
|
"status": state.status if state else None,
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "inactive",
|
|
"reason": "no active goal",
|
|
"message": "",
|
|
}
|
|
|
|
# Wait barrier: if the loop is parked (on a live process OR a time
|
|
# deadline that hasn't passed), quiesce — do NOT burn a turn or call
|
|
# the judge. Resumes automatically once the barrier clears.
|
|
if self.is_waiting():
|
|
if state.waiting_on_session is not None:
|
|
tgt = f"session {state.waiting_on_session}"
|
|
elif state.waiting_on_pid is not None:
|
|
tgt = f"pid {state.waiting_on_pid}"
|
|
else:
|
|
remaining = max(0, int(state.waiting_until - time.time()))
|
|
tgt = f"{remaining}s remaining"
|
|
reason = state.waiting_reason or tgt
|
|
return {
|
|
"status": "active",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "waiting",
|
|
"reason": reason,
|
|
"message": f"⏳ Goal parked — waiting on {tgt}: {reason}",
|
|
}
|
|
|
|
# Count the turn that just finished.
|
|
state.turns_used += 1
|
|
state.last_turn_at = time.time()
|
|
|
|
verdict, reason, parse_failed, wait_directive = judge_goal(
|
|
state.goal,
|
|
last_response,
|
|
subgoals=state.subgoals or None,
|
|
background_processes=background_processes,
|
|
)
|
|
state.last_verdict = verdict
|
|
state.last_reason = reason
|
|
|
|
# Track consecutive judge parse failures. Reset on any usable reply,
|
|
# including API / transport errors (parse_failed=False) so a flaky
|
|
# network doesn't trip the auto-pause meant for bad judge models.
|
|
if parse_failed:
|
|
state.consecutive_parse_failures += 1
|
|
else:
|
|
state.consecutive_parse_failures = 0
|
|
|
|
# WAIT verdict: the judge decided the agent is blocked on async work
|
|
# and re-poking now would be busy-work. Set the barrier and park —
|
|
# the turn we just counted stands (the judge call happened), but no
|
|
# continuation fires. The loop resumes automatically when the pid
|
|
# exits or the deadline passes (next evaluate_after_turn falls through
|
|
# the is_waiting() short-circuit once the barrier clears).
|
|
if verdict == "wait" and wait_directive:
|
|
if wait_directive.get("session_id"):
|
|
self.wait_on_session(str(wait_directive["session_id"]), reason=reason)
|
|
tgt = f"session {wait_directive['session_id']}"
|
|
elif wait_directive.get("pid"):
|
|
self.wait_on(int(wait_directive["pid"]), reason=reason)
|
|
tgt = f"pid {wait_directive['pid']}"
|
|
else:
|
|
self.wait_for_seconds(int(wait_directive["seconds"]), reason=reason)
|
|
tgt = f"{wait_directive['seconds']}s"
|
|
return {
|
|
"status": "active",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "wait",
|
|
"reason": reason,
|
|
"message": f"⏳ Goal parked (judge) — waiting on {tgt}: {reason}",
|
|
}
|
|
|
|
if verdict == "done":
|
|
state.status = "done"
|
|
save_goal(self.session_id, state)
|
|
return {
|
|
"status": "done",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "done",
|
|
"reason": reason,
|
|
"message": f"✓ Goal achieved: {reason}",
|
|
}
|
|
|
|
# Auto-pause when the judge model can't produce the expected JSON
|
|
# verdict N turns in a row. Points the user at the goal_judge config
|
|
# so they can route this side task to a model that follows the
|
|
# contract (e.g. google/gemini-3-flash-preview). Without this guard,
|
|
# weak judge models burn the entire turn budget returning prose or
|
|
# empty strings.
|
|
if state.consecutive_parse_failures >= DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES:
|
|
state.status = "paused"
|
|
state.paused_reason = (
|
|
f"judge model returned unparseable output {state.consecutive_parse_failures} turns in a row"
|
|
)
|
|
save_goal(self.session_id, state)
|
|
return {
|
|
"status": "paused",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "continue",
|
|
"reason": reason,
|
|
"message": (
|
|
f"⏸ Goal paused — the judge model ({state.consecutive_parse_failures} turns) "
|
|
"isn't returning the required JSON verdict. Route the judge to a stricter "
|
|
"model in ~/.hermes/config.yaml:\n"
|
|
" auxiliary:\n"
|
|
" goal_judge:\n"
|
|
" provider: openrouter\n"
|
|
" model: google/gemini-3-flash-preview\n"
|
|
"Then /goal resume to continue."
|
|
),
|
|
}
|
|
|
|
if state.turns_used >= state.max_turns:
|
|
state.status = "paused"
|
|
state.paused_reason = f"turn budget exhausted ({state.turns_used}/{state.max_turns})"
|
|
save_goal(self.session_id, state)
|
|
return {
|
|
"status": "paused",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "continue",
|
|
"reason": reason,
|
|
"message": (
|
|
f"⏸ Goal paused — {state.turns_used}/{state.max_turns} turns used. "
|
|
"Use /goal resume to keep going, or /goal clear to stop."
|
|
),
|
|
}
|
|
|
|
save_goal(self.session_id, state)
|
|
return {
|
|
"status": "active",
|
|
"should_continue": True,
|
|
"continuation_prompt": self.next_continuation_prompt(),
|
|
"verdict": "continue",
|
|
"reason": reason,
|
|
"message": (
|
|
f"↻ Continuing toward goal ({state.turns_used}/{state.max_turns}): {reason}"
|
|
),
|
|
}
|
|
|
|
def next_continuation_prompt(self) -> Optional[str]:
|
|
if not self._state or self._state.status != "active":
|
|
return None
|
|
if self._state.subgoals:
|
|
return CONTINUATION_PROMPT_WITH_SUBGOALS_TEMPLATE.format(
|
|
goal=self._state.goal,
|
|
subgoals_block=self._state.render_subgoals_block(),
|
|
)
|
|
return CONTINUATION_PROMPT_TEMPLATE.format(goal=self._state.goal)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Kanban worker goal loop
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
# Continuation prompt fed back to a kanban goal-mode worker that has not
|
|
# yet completed/blocked its task. The card's own acceptance criteria are
|
|
# the goal — the worker already has the full task body in its first turn,
|
|
# so we keep this short and point it back at the lifecycle contract.
|
|
KANBAN_GOAL_CONTINUATION_TEMPLATE = (
|
|
"[Continuing toward this kanban task — judge says it is not done yet]\n"
|
|
"Reason: {reason}\n\n"
|
|
"Take the next concrete step toward completing the task. When the work "
|
|
"is genuinely finished, call kanban_complete with a summary. If you are "
|
|
"blocked and need human input, call kanban_block with a reason. Do not "
|
|
"stop without calling one of them."
|
|
)
|
|
|
|
# Fed when the judge believes the work is done but the worker never called
|
|
# kanban_complete / kanban_block. One explicit nudge to terminate the task
|
|
# the right way before the loop gives up.
|
|
KANBAN_GOAL_FINALIZE_TEMPLATE = (
|
|
"[The work looks complete, but the task is still open]\n"
|
|
"Reason: {reason}\n\n"
|
|
"If the task is genuinely done, call kanban_complete now with a short "
|
|
"summary of what you did. If something still blocks completion, call "
|
|
"kanban_block with the reason instead."
|
|
)
|
|
|
|
|
|
def run_kanban_goal_loop(
|
|
*,
|
|
task_id: str,
|
|
goal_text: str,
|
|
run_turn,
|
|
task_status_fn,
|
|
block_fn,
|
|
max_turns: int = DEFAULT_MAX_TURNS,
|
|
first_response: str = "",
|
|
log=None,
|
|
) -> Dict[str, Any]:
|
|
"""Drive a kanban worker through a Ralph-style goal loop.
|
|
|
|
The dispatcher spawns a goal-mode worker exactly like a normal worker
|
|
(``hermes -p <profile> chat -q "work kanban task <id>"``). The worker's
|
|
first turn has already run by the time this is called; ``first_response``
|
|
is that turn's reply. From here we:
|
|
|
|
1. Check whether the worker already terminated the task (called
|
|
``kanban_complete`` / ``kanban_block``). If so, stop — nothing to do.
|
|
2. Otherwise judge the latest response against ``goal_text`` (the card's
|
|
title + body). ``continue`` → feed a continuation prompt and run
|
|
another turn IN THE SAME SESSION via ``run_turn``. ``done`` but the
|
|
task is still open → one explicit "call kanban_complete" nudge.
|
|
3. When the turn budget is exhausted and the worker still hasn't
|
|
terminated the task, ``block_fn`` is invoked so the card lands in a
|
|
sticky ``blocked`` state for human review (NOT a silent exit).
|
|
|
|
This function performs NO SessionDB persistence — a worker process is
|
|
ephemeral, so the turn budget lives in a local counter. It is fully
|
|
decoupled from the CLI for testability: callers inject ``run_turn``
|
|
(str -> str), ``task_status_fn`` (() -> str|None), and ``block_fn``
|
|
(reason: str -> None).
|
|
|
|
Returns a decision dict: ``{"outcome", "turns_used", "reason"}`` where
|
|
outcome is one of ``"completed_by_worker"``, ``"blocked_budget"``,
|
|
``"blocked_by_worker"``, or ``"stopped"``.
|
|
"""
|
|
|
|
def _log(msg: str) -> None:
|
|
if log is not None:
|
|
try:
|
|
log(msg)
|
|
except Exception:
|
|
pass
|
|
|
|
max_turns = int(max_turns or DEFAULT_MAX_TURNS)
|
|
if max_turns < 1:
|
|
max_turns = DEFAULT_MAX_TURNS
|
|
|
|
last_response = first_response or ""
|
|
# The first turn already consumed one unit of budget.
|
|
turns_used = 1
|
|
nudged_to_finalize = False
|
|
|
|
while True:
|
|
# Did the worker terminate the task itself this turn?
|
|
try:
|
|
status = task_status_fn()
|
|
except Exception as exc:
|
|
_log(f"kanban goal loop: status check failed ({exc}); stopping")
|
|
return {"outcome": "stopped", "turns_used": turns_used, "reason": "status check failed"}
|
|
|
|
if status == "done":
|
|
_log(f"kanban goal loop: task {task_id} completed by worker after {turns_used} turn(s)")
|
|
return {"outcome": "completed_by_worker", "turns_used": turns_used, "reason": "worker completed the task"}
|
|
if status == "blocked":
|
|
_log(f"kanban goal loop: task {task_id} blocked by worker after {turns_used} turn(s)")
|
|
return {"outcome": "blocked_by_worker", "turns_used": turns_used, "reason": "worker blocked the task"}
|
|
if status not in ("running", "ready"):
|
|
# Reclaimed / archived / unexpected — let the dispatcher own it.
|
|
_log(f"kanban goal loop: task {task_id} status={status!r}; stopping")
|
|
return {"outcome": "stopped", "turns_used": turns_used, "reason": f"status={status}"}
|
|
|
|
# Still open — judge whether the latest response satisfies the card.
|
|
# The kanban worker loop has no wait-barrier concept (workers finish
|
|
# via kanban_complete / kanban_block, not by parking), so a WAIT
|
|
# verdict is treated as CONTINUE here.
|
|
verdict, reason, _parse_failed, _wait = judge_goal(goal_text, last_response)
|
|
if verdict == "wait":
|
|
verdict = "continue"
|
|
_log(f"kanban goal loop: turn {turns_used}/{max_turns} verdict={verdict} reason={_truncate(reason, 120)}")
|
|
|
|
if verdict == "done":
|
|
if nudged_to_finalize:
|
|
# Already asked once to call kanban_complete and it still
|
|
# didn't — block for review rather than spin.
|
|
_log(f"kanban goal loop: task {task_id} judged done but worker won't finalize; blocking")
|
|
try:
|
|
block_fn(
|
|
f"Goal-mode worker's output looked complete but it never "
|
|
f"called kanban_complete after a finalize nudge ({reason})."
|
|
)
|
|
except Exception as exc:
|
|
_log(f"kanban goal loop: block_fn failed ({exc})")
|
|
return {"outcome": "blocked_budget", "turns_used": turns_used, "reason": "judged done, never finalized"}
|
|
prompt = KANBAN_GOAL_FINALIZE_TEMPLATE.format(reason=_truncate(reason, 400))
|
|
nudged_to_finalize = True
|
|
else:
|
|
prompt = KANBAN_GOAL_CONTINUATION_TEMPLATE.format(reason=_truncate(reason, 400))
|
|
|
|
# Budget check BEFORE spending another turn.
|
|
if turns_used >= max_turns:
|
|
_log(f"kanban goal loop: task {task_id} exhausted {turns_used}/{max_turns} turns; blocking")
|
|
try:
|
|
block_fn(
|
|
f"Goal-mode worker exhausted its turn budget "
|
|
f"({turns_used}/{max_turns}) without completing the task. "
|
|
f"Last judge verdict: {_truncate(reason, 300)}"
|
|
)
|
|
except Exception as exc:
|
|
_log(f"kanban goal loop: block_fn failed ({exc})")
|
|
return {"outcome": "blocked_budget", "turns_used": turns_used, "reason": "turn budget exhausted"}
|
|
|
|
# Run another turn in the same session.
|
|
try:
|
|
last_response = run_turn(prompt) or ""
|
|
except Exception as exc:
|
|
_log(f"kanban goal loop: run_turn failed ({exc}); stopping")
|
|
return {"outcome": "stopped", "turns_used": turns_used, "reason": f"run_turn error: {type(exc).__name__}"}
|
|
turns_used += 1
|
|
|
|
|
|
__all__ = [
|
|
"GoalState",
|
|
"GoalManager",
|
|
"CONTINUATION_PROMPT_TEMPLATE",
|
|
"CONTINUATION_PROMPT_WITH_SUBGOALS_TEMPLATE",
|
|
"JUDGE_USER_PROMPT_TEMPLATE",
|
|
"JUDGE_USER_PROMPT_WITH_SUBGOALS_TEMPLATE",
|
|
"KANBAN_GOAL_CONTINUATION_TEMPLATE",
|
|
"KANBAN_GOAL_FINALIZE_TEMPLATE",
|
|
"DEFAULT_MAX_TURNS",
|
|
"load_goal",
|
|
"save_goal",
|
|
"clear_goal",
|
|
"migrate_goal_to_session",
|
|
"judge_goal",
|
|
"run_kanban_goal_loop",
|
|
]
|