diff --git a/cli.py b/cli.py index ad0a5050aa2..39498e696d4 100644 --- a/cli.py +++ b/cli.py @@ -8460,7 +8460,17 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin): if not last_response.strip(): return - decision = mgr.evaluate_after_turn(last_response, user_initiated=True) + try: + from hermes_cli.goals import gather_background_processes as _gather_bg + _bg_procs = _gather_bg() + except Exception: + _bg_procs = None + + decision = mgr.evaluate_after_turn( + last_response, + user_initiated=True, + background_processes=_bg_procs, + ) msg = decision.get("message") or "" if msg: _cprint(f" {msg}") diff --git a/gateway/run.py b/gateway/run.py index 43bcb62cf32..4f3b12375d6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7768,16 +7768,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if _cmd_def_inner and _cmd_def_inner.name == "kanban": return await self._handle_kanban_command(event) - # /goal is safe mid-run for status/pause/clear (inspection and - # control-plane only — doesn't interrupt the running turn). + # /goal is safe mid-run for status/pause/clear/wait (inspection + # and control-plane only — doesn't interrupt the running turn). # Setting a new goal text mid-run is rejected with the same # "wait or /stop" message as /model so we don't race a second # continuation prompt against the current turn. if _cmd_def_inner and _cmd_def_inner.name == "goal": _goal_arg = (event.get_command_args() or "").strip().lower() - if not _goal_arg or _goal_arg in {"status", "pause", "resume", "clear", "stop", "done"}: + _goal_verb = _goal_arg.split(None, 1)[0] if _goal_arg else "" + # Exact-match control verbs (unchanged semantics), plus the + # wait/unwait barrier verbs which take a pid argument. + _is_control = ( + not _goal_arg + or _goal_arg in {"status", "pause", "resume", "clear", "stop", "done", "unwait"} + or _goal_verb == "wait" + ) + if _is_control: return await self._handle_goal_command(event) - return "Agent is running — use /goal status / pause / clear mid-run, or /stop before setting a new goal." + return "Agent is running — use /goal status / pause / clear / wait mid-run, or /stop before setting a new goal." # /subgoal is safe mid-run — it only modifies the goal's # subgoals list, which the judge reads at the next turn @@ -10634,7 +10642,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if not mgr.is_active(): return - decision = mgr.evaluate_after_turn(final_response or "", user_initiated=True) + try: + from hermes_cli.goals import gather_background_processes as _gather_bg + _bg_procs = _gather_bg() + except Exception: + _bg_procs = None + + decision = mgr.evaluate_after_turn( + final_response or "", + user_initiated=True, + background_processes=_bg_procs, + ) msg = decision.get("message") or "" # Defer the status line until after the adapter has delivered the diff --git a/gateway/slash_commands.py b/gateway/slash_commands.py index ca519413a07..621492da95c 100644 --- a/gateway/slash_commands.py +++ b/gateway/slash_commands.py @@ -1808,6 +1808,30 @@ class GatewaySlashCommandsMixin: logger.debug("goal clear: pending continuation cleanup failed: %s", exc) return t("gateway.goal_cleared") if had else t("gateway.no_active_goal") + # /goal wait [reason] — park the loop on a background process. + if lower == "wait" or lower.startswith("wait "): + wait_arg = args[len("wait"):].strip() + if not wait_arg: + return "Usage: /goal wait [reason]" + wtokens = wait_arg.split(None, 1) + try: + pid = int(wtokens[0]) + except ValueError: + return "/goal wait: must be an integer process id." + reason = wtokens[1].strip() if len(wtokens) > 1 else "" + try: + mgr.wait_on(pid, reason=reason) + except (RuntimeError, ValueError) as exc: + return f"/goal wait: {exc}" + rtxt = f" ({reason})" if reason else "" + return f"⏳ Goal parked on pid {pid}{rtxt}. Loop pauses until it exits." + + # /goal unwait — clear the wait barrier. + if lower == "unwait": + if mgr.stop_waiting(): + return "▶ Wait barrier cleared — goal loop resumes." + return "No wait barrier set." + # Otherwise — treat the remaining text as the new goal. try: state = mgr.set(args) diff --git a/hermes_cli/cli_commands_mixin.py b/hermes_cli/cli_commands_mixin.py index 831cde7c85b..edd3f42542d 100644 --- a/hermes_cli/cli_commands_mixin.py +++ b/hermes_cli/cli_commands_mixin.py @@ -1821,6 +1821,38 @@ class CLICommandsMixin: _cprint(f" {_DIM}No active goal.{_RST}") return + # /goal wait [reason] — park the loop on a background process so + # it stops re-poking the agent every turn while it waits on CI / a + # build / a long job. The barrier auto-clears when the PID exits. + if lower == "wait" or lower.startswith("wait "): + wait_arg = arg[len("wait"):].strip() + if not wait_arg: + _cprint(" Usage: /goal wait [reason]") + return + wtokens = wait_arg.split(None, 1) + try: + pid = int(wtokens[0]) + except ValueError: + _cprint(" /goal wait: must be an integer process id.") + return + reason = wtokens[1].strip() if len(wtokens) > 1 else "" + try: + mgr.wait_on(pid, reason=reason) + except (RuntimeError, ValueError) as exc: + _cprint(f" /goal wait: {exc}") + return + rtxt = f" ({reason})" if reason else "" + _cprint(f" ⏳ Goal parked on pid {pid}{rtxt}. Loop pauses until it exits.") + return + + # /goal unwait — drop the wait barrier and resume normal looping. + if lower == "unwait": + if mgr.stop_waiting(): + _cprint(" ▶ Wait barrier cleared — goal loop resumes.") + else: + _cprint(f" {_DIM}No wait barrier set.{_RST}") + return + # Otherwise treat the arg as the goal text. try: state = mgr.set(arg) diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index d9d9d1b3579..59cb8aa3648 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -108,7 +108,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ CommandDef("steer", "Inject a message after the next tool call without interrupting", "Session", args_hint=""), CommandDef("goal", "Set a standing goal Hermes works on across turns until achieved", "Session", - args_hint="[text | pause | resume | clear | status]"), + args_hint="[text | pause | resume | clear | status | wait | unwait]"), CommandDef("subgoal", "Add or manage extra criteria on the active goal", "Session", args_hint="[text | remove N | clear]"), CommandDef("status", "Show session, model, token, and context info", "Session"), diff --git a/hermes_cli/goals.py b/hermes_cli/goals.py index 8359466e3a0..d9ef82909d8 100644 --- a/hermes_cli/goals.py +++ b/hermes_cli/goals.py @@ -94,25 +94,59 @@ CONTINUATION_PROMPT_WITH_SUBGOALS_TEMPLATE = ( JUDGE_SYSTEM_PROMPT = ( "You are a strict judge evaluating whether an autonomous agent has " - "achieved a user's stated goal. You receive the goal text and the " - "agent's most recent response. Your only job is to decide whether " - "the goal is fully satisfied based on that response.\n\n" - "A goal is DONE only when:\n" + "achieved a user's stated goal. You receive the goal text, the agent's " + "most recent response, and — when present — a list of background " + "processes the agent has running. Decide one of three verdicts.\n\n" + "DONE — the goal is fully satisfied:\n" "- The response explicitly confirms the goal was completed, OR\n" "- The response clearly shows the final deliverable was produced, OR\n" "- The response explains the goal is unachievable / blocked / needs " "user input (treat this as DONE with reason describing the block).\n\n" - "Otherwise the goal is NOT done — CONTINUE.\n\n" - "Reply ONLY with a single JSON object on one line:\n" - '{\"done\": , \"reason\": \"\"}' + "WAIT — the goal is NOT done, but the next step is to wait for async " + "work to finish rather than act again. Choose this ONLY when the agent's " + "progress is genuinely gated on something running on its own:\n" + "- A background process listed below is still running AND the response " + "shows the agent is waiting on its result (e.g. a CI poller, build, " + "test run, deploy). If the process has a session id, return it in " + "``wait_on_session`` — that releases when the process exits OR its " + "watch_patterns trigger fires (use this for a long-lived watcher that " + "signals mid-run and may never exit). Otherwise return its pid in " + "``wait_on_pid`` (releases on exit only).\n" + "- The agent says it is rate-limited / backing off / must wait a fixed " + "period — return seconds in ``wait_for_seconds``.\n" + "Picking WAIT parks the loop without burning a turn; it resumes " + "automatically when the pid exits or the time elapses. Do NOT pick WAIT " + "just because work remains — only when re-poking now would be pure " + "busy-work because the agent can't progress until the async thing " + "finishes.\n\n" + "CONTINUE — not done, and there is a concrete next step the agent can " + "take right now. This is the default when in doubt.\n\n" + "Reply ONLY with a single JSON object on one line. Shapes:\n" + '{"verdict": "done", "reason": ""}\n' + '{"verdict": "continue", "reason": ""}\n' + '{"verdict": "wait", "wait_on_session": "", "reason": ""}\n' + '{"verdict": "wait", "wait_on_pid": , "reason": ""}\n' + '{"verdict": "wait", "wait_for_seconds": , "reason": ""}\n' + "The legacy shape {\"done\": , \"reason\": \"...\"} is still " + "accepted (true=done, false=continue)." +) + + +# Rendered into the judge prompt when the agent has background processes +# running. Gives the judge the context it needs to decide WAIT vs CONTINUE +# (and which pid to wait on) without it having to probe anything itself. +JUDGE_BACKGROUND_BLOCK_TEMPLATE = ( + "Background processes the agent currently has running (it may be waiting " + "on one of these):\n{background_lines}\n\n" ) JUDGE_USER_PROMPT_TEMPLATE = ( "Goal:\n{goal}\n\n" "Agent's most recent response:\n{response}\n\n" + "{background_block}" "Current time: {current_time}\n\n" - "Is the goal satisfied?" + "Is the goal satisfied — done, continue, or wait?" ) # Used when the user has added /subgoal criteria. The judge must @@ -122,6 +156,7 @@ JUDGE_USER_PROMPT_WITH_SUBGOALS_TEMPLATE = ( "Additional criteria the user added mid-loop (all must also be " "satisfied for the goal to be DONE):\n{subgoals_block}\n\n" "Agent's most recent response:\n{response}\n\n" + "{background_block}" "Current time: {current_time}\n\n" "Decision: For each numbered criterion above, find concrete " "evidence in the agent's response that the criterion is " @@ -129,7 +164,8 @@ JUDGE_USER_PROMPT_WITH_SUBGOALS_TEMPLATE = ( "met' or 'implying it was done' — require specific evidence (a " "file contents excerpt, an output line, a command result). If " "ANY criterion lacks specific evidence in the response, the goal " - "is NOT done — return CONTINUE.\n\n" + "is NOT done — return CONTINUE (or WAIT if blocked on a listed " + "background process).\n\n" "Is the goal AND every additional criterion satisfied?" ) @@ -159,6 +195,30 @@ class GoalState: # them into the verdict. Backwards-compatible: defaults to empty so # old state_meta rows load unchanged. subgoals: List[str] = field(default_factory=list) + # Wait barrier: when the agent is blocked on long-running async work + # (CI poller, build, test run, deploy, rate-limit cooldown) the goal loop + # PARKS instead of being re-poked every turn into busy-work. Two barrier + # kinds, set automatically by the judge (which now sees the live + # background-process list and can return a ``wait`` verdict) or manually + # via ``/goal wait``: + # • ``waiting_on_pid`` — park until that process exits. + # • ``waiting_on_session`` — park until that process_registry session's + # OWN trigger fires: it exits, OR (if it has watch_patterns) its + # pattern matches. Covers long-lived watchers/servers that signal + # mid-run via a trigger and may never exit. Preferred over raw pid + # when the agent set up a watch_patterns/notify_on_complete process. + # • ``waiting_until`` — park until this wall-clock epoch (time backoff). + # While ANY is active, ``evaluate_after_turn`` short-circuits to + # should_continue=False without burning a turn or calling the judge. The + # barrier auto-clears when the pid exits / the trigger fires / the deadline + # passes, then the next turn resumes normal judging. Cleared by that, + # ``/goal unwait``, pause, resume, or clear. Backwards-compatible: old + # state_meta rows load with no barrier. + waiting_on_pid: Optional[int] = None + waiting_on_session: Optional[str] = None + waiting_until: float = 0.0 + waiting_reason: Optional[str] = None + waiting_since: float = 0.0 def to_json(self) -> str: return json.dumps(asdict(self), ensure_ascii=False) @@ -182,6 +242,11 @@ class GoalState: paused_reason=data.get("paused_reason"), consecutive_parse_failures=int(data.get("consecutive_parse_failures", 0) or 0), subgoals=subgoals, + waiting_on_pid=(int(data["waiting_on_pid"]) if data.get("waiting_on_pid") else None), + waiting_on_session=(str(data["waiting_on_session"]) if data.get("waiting_on_session") else None), + waiting_until=float(data.get("waiting_until", 0.0) or 0.0), + waiting_reason=data.get("waiting_reason"), + waiting_since=float(data.get("waiting_since", 0.0) or 0.0), ) # --- subgoals helpers ------------------------------------------------- @@ -330,6 +395,52 @@ def _truncate(text: str, limit: int) -> str: return text[:limit] + "… [truncated]" +def _pid_alive(pid: int) -> bool: + """Return True if a process with ``pid`` is currently alive. + + Delegates to ``gateway.status._pid_exists`` — the canonical, + cross-platform, footgun-safe liveness check (psutil with a ctypes / + POSIX fallback). Critically this avoids ``os.kill(pid, 0)``, which on + Windows is NOT a no-op: it routes to ``CTRL_C_EVENT`` and hard-kills the + target's console process group (bpo-14484). Any error resolves to False + (treat unknown as dead) so a stale barrier never wedges the loop — the + worst case is the goal resumes one turn early, which is safe. + """ + if not pid or pid <= 0: + return False + try: + from gateway.status import _pid_exists + + return bool(_pid_exists(int(pid))) + except Exception: + pass + # Last-resort fallback if gateway.status is unavailable: psutil directly. + try: + import psutil # type: ignore + + return bool(psutil.pid_exists(int(pid))) + except Exception: + return False + + +def _session_waiting(session_id: str) -> bool: + """Whether a goal parked on a process_registry session should stay parked. + + Delegates to ``process_registry.is_session_waiting`` — True while the + session is running and (if it has watch_patterns) its trigger hasn't fired. + Fail-safe: any import/registry error yields False (don't wait) so a stale + barrier can never wedge the loop. + """ + if not session_id: + return False + try: + from tools.process_registry import process_registry + + return bool(process_registry.is_session_waiting(session_id)) + except Exception: + return False + + _JSON_OBJECT_RE = re.compile(r"\{.*?\}", re.DOTALL) @@ -357,17 +468,25 @@ def _goal_judge_max_tokens() -> int: return DEFAULT_JUDGE_MAX_TOKENS -def _parse_judge_response(raw: str) -> Tuple[bool, str, bool]: - """Parse the judge's reply. Fail-open to ``(False, "", parse_failed)``. +def _parse_judge_response(raw: str) -> Tuple[str, str, bool, Optional[Dict[str, Any]]]: + """Parse the judge's reply. Fail-open on unusable output. - Returns ``(done, reason, parse_failed)``. ``parse_failed`` is True when the - judge returned output that couldn't be interpreted as the expected JSON - verdict (empty body, prose, malformed JSON). Callers use that flag to - auto-pause after N consecutive parse failures so a weak judge model - doesn't silently burn the turn budget. + Returns ``(verdict, reason, parse_failed, wait_directive)`` where: + - ``verdict`` is ``"done"``, ``"continue"``, or ``"wait"``. + - ``parse_failed`` is True when the judge returned output that couldn't + be interpreted as the expected JSON verdict (empty body, prose, + malformed JSON). Callers use it to auto-pause after N consecutive + parse failures so a weak judge model doesn't silently burn the budget. + - ``wait_directive`` is set only for ``verdict == "wait"``: a dict with + ``{"pid": int}`` or ``{"seconds": int}`` (whichever the judge supplied). + ``None`` otherwise. If a wait verdict carries neither a usable pid nor + seconds, it is downgraded to ``continue`` (can't park on nothing). + + Accepts both the new ``{"verdict": ...}`` shape and the legacy + ``{"done": }`` shape. """ if not raw: - return False, "judge returned empty response", True + return "continue", "judge returned empty response", True, None text = raw.strip() @@ -393,17 +512,103 @@ def _parse_judge_response(raw: str) -> Tuple[bool, str, bool]: data = None if not isinstance(data, dict): - return False, f"judge reply was not JSON: {_truncate(raw, 200)!r}", True + return "continue", f"judge reply was not JSON: {_truncate(raw, 200)!r}", True, None - done_val = data.get("done") - if isinstance(done_val, str): - done = done_val.strip().lower() in {"true", "yes", "1", "done"} + reason = str(data.get("reason") or "").strip() or "no reason provided" + + # Determine verdict — prefer the explicit "verdict" field, fall back to + # the legacy "done" boolean. + verdict_raw = data.get("verdict") + if isinstance(verdict_raw, str): + verdict = verdict_raw.strip().lower() else: - done = bool(done_val) - reason = str(data.get("reason") or "").strip() - if not reason: - reason = "no reason provided" - return done, reason, False + done_val = data.get("done") + if isinstance(done_val, str): + done = done_val.strip().lower() in {"true", "yes", "1", "done"} + else: + done = bool(done_val) + verdict = "done" if done else "continue" + + if verdict not in {"done", "continue", "wait"}: + verdict = "continue" + + if verdict != "wait": + return verdict, reason, False, None + + # Wait verdict: extract a concrete directive (pid or seconds). Accept a + # few key spellings the model might emit. + def _first_int(*keys: str) -> Optional[int]: + for k in keys: + v = data.get(k) + if v is None: + continue + try: + iv = int(v) + if iv > 0: + return iv + except (TypeError, ValueError): + continue + return None + + # Prefer a session-id directive (releases on the process's own trigger — + # exit OR watch-pattern match), then pid (exit only), then seconds. + sess = data.get("wait_on_session") or data.get("session_id") or data.get("wait_session") + if isinstance(sess, str) and sess.strip(): + return "wait", reason, False, {"session_id": sess.strip()} + pid = _first_int("wait_on_pid", "pid", "wait_pid") + if pid is not None: + return "wait", reason, False, {"pid": pid} + seconds = _first_int("wait_for_seconds", "seconds", "wait_seconds") + if seconds is not None: + return "wait", reason, False, {"seconds": seconds} + # Wait with no usable target — can't park on nothing; treat as continue. + return "continue", f"{reason} (wait verdict had no target — continuing)", False, None + + +def _render_background_block(background_processes: Optional[List[Dict[str, Any]]]) -> str: + """Render the live background-process list for the judge prompt. + + Each entry is a ``process_registry.list_sessions()`` dict. Only RUNNING + processes are worth showing (an exited one is nothing to wait on). Returns + an empty string when there's nothing running, so the judge prompt is + byte-identical to the no-background case (no behavior change for the + common path). + """ + if not background_processes: + return "" + lines: List[str] = [] + for p in background_processes: + if not isinstance(p, dict): + continue + if p.get("status") == "exited": + continue + pid = p.get("pid") + if not pid: + continue + cmd = _truncate(str(p.get("command") or "").replace("\n", " ").strip(), 120) + uptime = p.get("uptime_seconds") + tail = _truncate(str(p.get("output_preview") or "").replace("\n", " ").strip(), 120) + sid = p.get("session_id") + line = f"- pid {pid}" + if sid: + line += f" / session {sid}" + line += f": {cmd}" + if uptime is not None: + line += f" (running {uptime}s)" + # Surface the process's own trigger so the judge can wait on a + # mid-run signal (watch-pattern) or completion, not just exit. + wps = p.get("watch_patterns") + if wps: + hit = " [already matched]" if p.get("watch_hit") else "" + line += f" | watch_patterns={wps}{hit}" + elif p.get("notify_on_complete"): + line += " | notify_on_complete" + if tail: + line += f" | recent output: {tail}" + lines.append(line) + if not lines: + return "" + return JUDGE_BACKGROUND_BLOCK_TEMPLATE.format(background_lines="\n".join(lines)) def judge_goal( @@ -412,11 +617,14 @@ def judge_goal( *, timeout: float = DEFAULT_JUDGE_TIMEOUT, subgoals: Optional[List[str]] = None, -) -> Tuple[str, str, bool]: + background_processes: Optional[List[Dict[str, Any]]] = None, +) -> Tuple[str, str, bool, Optional[Dict[str, Any]]]: """Ask the auxiliary model whether the goal is satisfied. - Returns ``(verdict, reason, parse_failed)`` where verdict is ``"done"``, - ``"continue"``, or ``"skipped"`` (when the judge couldn't be reached). + Returns ``(verdict, reason, parse_failed, wait_directive)`` where verdict + is ``"done"``, ``"continue"``, ``"wait"``, or ``"skipped"`` (when the + judge couldn't be reached). ``wait_directive`` is set only for ``"wait"`` + (``{"pid": int}`` or ``{"seconds": int}``); ``None`` otherwise. ``parse_failed`` is True only when the judge call succeeded but its output was unusable (empty or non-JSON). API/transport errors return False — they @@ -425,37 +633,39 @@ def judge_goal( ``DEFAULT_MAX_CONSECUTIVE_PARSE_FAILURES``). ``subgoals`` is an optional list of user-added criteria (from - ``/subgoal``) that the judge must also factor into its DONE/CONTINUE - decision. When non-empty the prompt switches to the with-subgoals - template; otherwise behavior is identical to the original judge. + ``/subgoal``) factored into the verdict. ``background_processes`` is the + live ``process_registry.list_sessions()`` snapshot; when the agent is + waiting on one (a CI poller, build, etc.) the judge can return a ``wait`` + verdict naming its pid, parking the loop instead of re-poking. - This is deliberately fail-open: any error returns ``("continue", "...", False)`` + This is deliberately fail-open: any error returns ``("continue", ..., False, None)`` so a broken judge doesn't wedge progress — the turn budget and the consecutive-parse-failures auto-pause are the backstops. """ if not goal.strip(): - return "skipped", "empty goal", False + return "skipped", "empty goal", False, None if not last_response.strip(): # No substantive reply this turn — almost certainly not done yet. - return "continue", "empty response (nothing to evaluate)", False + return "continue", "empty response (nothing to evaluate)", False, None try: from agent.auxiliary_client import get_auxiliary_extra_body, get_text_auxiliary_client except Exception as exc: logger.debug("goal judge: auxiliary client import failed: %s", exc) - return "continue", "auxiliary client unavailable", False + return "continue", "auxiliary client unavailable", False, None try: client, model = get_text_auxiliary_client("goal_judge") except Exception as exc: logger.debug("goal judge: get_text_auxiliary_client failed: %s", exc) - return "continue", "auxiliary client unavailable", False + return "continue", "auxiliary client unavailable", False, None if client is None or not model: - return "continue", "no auxiliary client configured", False + return "continue", "no auxiliary client configured", False, None # Build the prompt — pick the with-subgoals variant when applicable. clean_subgoals = [s.strip() for s in (subgoals or []) if s and s.strip()] + background_block = _render_background_block(background_processes) current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S %Z") if clean_subgoals: subgoals_block = "\n".join( @@ -465,12 +675,14 @@ def judge_goal( goal=_truncate(goal, 2000), subgoals_block=_truncate(subgoals_block, 2000), response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS), + background_block=background_block, current_time=current_time, ) else: prompt = JUDGE_USER_PROMPT_TEMPLATE.format( goal=_truncate(goal, 2000), response=_truncate(last_response, _JUDGE_RESPONSE_SNIPPET_CHARS), + background_block=background_block, current_time=current_time, ) @@ -488,17 +700,40 @@ def judge_goal( ) except Exception as exc: logger.info("goal judge: API call failed (%s) — falling through to continue", exc) - return "continue", f"judge error: {type(exc).__name__}", False + return "continue", f"judge error: {type(exc).__name__}", False, None try: raw = resp.choices[0].message.content or "" except Exception: raw = "" - done, reason, parse_failed = _parse_judge_response(raw) - verdict = "done" if done else "continue" - logger.info("goal judge: verdict=%s reason=%s", verdict, _truncate(reason, 120)) - return verdict, reason, parse_failed + verdict, reason, parse_failed, wait_directive = _parse_judge_response(raw) + logger.info( + "goal judge: verdict=%s reason=%s%s", + verdict, _truncate(reason, 120), + f" wait={wait_directive}" if wait_directive else "", + ) + return verdict, reason, parse_failed, wait_directive + + +def gather_background_processes(task_id: Optional[str] = None) -> List[Dict[str, Any]]: + """Return the live background-process snapshot for the goal judge. + + Thin, fail-safe wrapper over ``process_registry.list_sessions(task_id)``. + Returns only RUNNING processes (an exited one is nothing to wait on) and + never raises — any import/registry failure yields ``[]`` so the goal loop + degrades to its pre-wait-barrier behavior (judge just won't see processes). + The drivers (CLI + gateway) call this and pass the result into + ``GoalManager.evaluate_after_turn(background_processes=...)``. + """ + try: + from tools.process_registry import process_registry + + sessions = process_registry.list_sessions(task_id=task_id) or [] + except Exception as exc: + logger.debug("gather_background_processes failed: %s", exc) + return [] + return [s for s in sessions if isinstance(s, dict) and s.get("status") != "exited"] # ────────────────────────────────────────────────────────────────────── @@ -547,6 +782,16 @@ class GoalManager: turns = f"{s.turns_used}/{s.max_turns} turns" sub = f", {len(s.subgoals)} subgoal{'s' if len(s.subgoals) != 1 else ''}" if s.subgoals else "" if s.status == "active": + if s.waiting_on_session and _session_waiting(s.waiting_on_session): + wr = s.waiting_reason or f"session {s.waiting_on_session}" + return f"⏳ Goal (parked on {wr}, {turns}{sub}): {s.goal}" + if s.waiting_on_pid and _pid_alive(s.waiting_on_pid): + wr = s.waiting_reason or f"pid {s.waiting_on_pid}" + return f"⏳ Goal (parked on {wr}, {turns}{sub}): {s.goal}" + if s.waiting_until and time.time() < s.waiting_until: + remaining = int(s.waiting_until - time.time()) + wr = s.waiting_reason or f"{remaining}s" + return f"⏳ Goal (parked {remaining}s — {wr}, {turns}{sub}): {s.goal}" return f"⊙ Goal (active, {turns}{sub}): {s.goal}" if s.status == "paused": extra = f" — {s.paused_reason}" if s.paused_reason else "" @@ -578,6 +823,12 @@ class GoalManager: return None self._state.status = "paused" self._state.paused_reason = reason + # A wait barrier is meaningless once paused — drop it. + self._state.waiting_on_pid = None + self._state.waiting_on_session = None + self._state.waiting_until = 0.0 + self._state.waiting_reason = None + self._state.waiting_since = 0.0 save_goal(self.session_id, self._state) return self._state @@ -586,6 +837,12 @@ class GoalManager: return None self._state.status = "active" self._state.paused_reason = None + # Resuming starts fresh — clear any stale barrier. + self._state.waiting_on_pid = None + self._state.waiting_on_session = None + self._state.waiting_until = 0.0 + self._state.waiting_reason = None + self._state.waiting_since = 0.0 if reset_budget: self._state.turns_used = 0 save_goal(self.session_id, self._state) @@ -653,6 +910,123 @@ class GoalManager: return "(no subgoals — use /subgoal to add criteria)" return self._state.render_subgoals_block() + # --- /goal wait barrier ------------------------------------------- + + def wait_on(self, pid: int, reason: str = "") -> GoalState: + """Park the goal loop on a background process PID. + + While the PID is alive, ``evaluate_after_turn`` returns + ``should_continue=False`` without burning a turn or calling the + judge — the loop quiesces instead of re-poking the agent into busy + work. The barrier auto-clears when the process exits. Requires an + active goal. For a process with a watch_patterns/notify_on_complete + trigger, prefer ``wait_on_session`` so a mid-run trigger (not just + exit) releases the barrier. + """ + if self._state is None or self._state.status != "active": + raise RuntimeError("no active goal to park") + pid = int(pid) + if pid <= 0: + raise ValueError("pid must be a positive integer") + self._state.waiting_on_pid = pid + self._state.waiting_on_session = None + self._state.waiting_until = 0.0 + self._state.waiting_reason = (reason or "").strip() or None + self._state.waiting_since = time.time() + save_goal(self.session_id, self._state) + return self._state + + def wait_on_session(self, session_id: str, reason: str = "") -> GoalState: + """Park the goal loop on a process_registry session's OWN trigger. + + Unlike ``wait_on`` (which releases only on PID exit), this releases + when the session's trigger fires: it exits, OR — if it was started + with ``watch_patterns`` — its pattern matches. This is the right + barrier for a long-lived watcher/server/poller that signals mid-run + and may never exit. Requires an active goal. + """ + if self._state is None or self._state.status != "active": + raise RuntimeError("no active goal to park") + session_id = str(session_id or "").strip() + if not session_id: + raise ValueError("session_id must be a non-empty string") + self._state.waiting_on_session = session_id + self._state.waiting_on_pid = None + self._state.waiting_until = 0.0 + self._state.waiting_reason = (reason or "").strip() or None + self._state.waiting_since = time.time() + save_goal(self.session_id, self._state) + return self._state + + def wait_for_seconds(self, seconds: int, reason: str = "") -> GoalState: + """Park the goal loop until ``seconds`` from now have elapsed. + + Time-based counterpart to ``wait_on`` — for backoff / cooldown waits + where there's no process to track (e.g. the agent is rate-limited). + The barrier auto-clears once the deadline passes. Requires an active + goal. + """ + if self._state is None or self._state.status != "active": + raise RuntimeError("no active goal to park") + seconds = int(seconds) + if seconds <= 0: + raise ValueError("seconds must be a positive integer") + self._state.waiting_on_pid = None + self._state.waiting_on_session = None + self._state.waiting_until = time.time() + seconds + self._state.waiting_reason = (reason or "").strip() or None + self._state.waiting_since = time.time() + save_goal(self.session_id, self._state) + return self._state + + def stop_waiting(self) -> bool: + """Clear any active wait barrier (pid / session / time). Returns True + if one was cleared.""" + if self._state is None: + return False + if ( + self._state.waiting_on_pid is None + and self._state.waiting_on_session is None + and not self._state.waiting_until + ): + return False + self._state.waiting_on_pid = None + self._state.waiting_on_session = None + self._state.waiting_until = 0.0 + self._state.waiting_reason = None + self._state.waiting_since = 0.0 + save_goal(self.session_id, self._state) + return True + + def is_waiting(self) -> bool: + """True iff a barrier is set AND not yet satisfied. + + Session barrier: active until the process exits or its watch-pattern + trigger fires. Pid barrier: active while the process is alive. Time + barrier: active until the deadline passes. Side effect: a satisfied + barrier is cleared here (lazy auto-clear) so the next evaluation + resumes normal judging. + """ + s = self._state + if s is None: + return False + if s.waiting_on_session is not None: + if _session_waiting(s.waiting_on_session): + return True + self.stop_waiting() # session exited or trigger fired + return False + if s.waiting_on_pid is not None: + if _pid_alive(s.waiting_on_pid): + return True + self.stop_waiting() # process gone + return False + if s.waiting_until: + if time.time() < s.waiting_until: + return True + self.stop_waiting() # deadline passed + return False + return False + # --- the main entry point called after every turn ----------------- def evaluate_after_turn( @@ -660,6 +1034,7 @@ class GoalManager: last_response: str, *, user_initiated: bool = True, + background_processes: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Run the judge and update state. Return a decision dict. @@ -667,11 +1042,16 @@ class GoalManager: continuation prompt we fed ourselves (False). Both increment ``turns_used`` because both consume model budget. + ``background_processes`` is the live ``process_registry.list_sessions()`` + snapshot for this session. It's handed to the judge so it can decide + to WAIT on an in-flight process (CI poller, build, ...) instead of + re-poking the agent — the automatic counterpart to ``/goal wait``. + Decision keys: - ``status``: current goal status after update - ``should_continue``: bool — caller should fire another turn - ``continuation_prompt``: str or None - - ``verdict``: "done" | "continue" | "skipped" | "inactive" + - ``verdict``: "done" | "continue" | "wait" | "skipped" | "inactive" - ``reason``: str - ``message``: user-visible one-liner to print/send """ @@ -686,12 +1066,36 @@ class GoalManager: "message": "", } + # Wait barrier: if the loop is parked (on a live process OR a time + # deadline that hasn't passed), quiesce — do NOT burn a turn or call + # the judge. Resumes automatically once the barrier clears. + if self.is_waiting(): + if state.waiting_on_session is not None: + tgt = f"session {state.waiting_on_session}" + elif state.waiting_on_pid is not None: + tgt = f"pid {state.waiting_on_pid}" + else: + remaining = max(0, int(state.waiting_until - time.time())) + tgt = f"{remaining}s remaining" + reason = state.waiting_reason or tgt + return { + "status": "active", + "should_continue": False, + "continuation_prompt": None, + "verdict": "waiting", + "reason": reason, + "message": f"⏳ Goal parked — waiting on {tgt}: {reason}", + } + # Count the turn that just finished. state.turns_used += 1 state.last_turn_at = time.time() - verdict, reason, parse_failed = judge_goal( - state.goal, last_response, subgoals=state.subgoals or None + verdict, reason, parse_failed, wait_directive = judge_goal( + state.goal, + last_response, + subgoals=state.subgoals or None, + background_processes=background_processes, ) state.last_verdict = verdict state.last_reason = reason @@ -704,6 +1108,31 @@ class GoalManager: else: state.consecutive_parse_failures = 0 + # WAIT verdict: the judge decided the agent is blocked on async work + # and re-poking now would be busy-work. Set the barrier and park — + # the turn we just counted stands (the judge call happened), but no + # continuation fires. The loop resumes automatically when the pid + # exits or the deadline passes (next evaluate_after_turn falls through + # the is_waiting() short-circuit once the barrier clears). + if verdict == "wait" and wait_directive: + if wait_directive.get("session_id"): + self.wait_on_session(str(wait_directive["session_id"]), reason=reason) + tgt = f"session {wait_directive['session_id']}" + elif wait_directive.get("pid"): + self.wait_on(int(wait_directive["pid"]), reason=reason) + tgt = f"pid {wait_directive['pid']}" + else: + self.wait_for_seconds(int(wait_directive["seconds"]), reason=reason) + tgt = f"{wait_directive['seconds']}s" + return { + "status": "active", + "should_continue": False, + "continuation_prompt": None, + "verdict": "wait", + "reason": reason, + "message": f"⏳ Goal parked (judge) — waiting on {tgt}: {reason}", + } + if verdict == "done": state.status = "done" save_goal(self.session_id, state) @@ -889,7 +1318,12 @@ def run_kanban_goal_loop( return {"outcome": "stopped", "turns_used": turns_used, "reason": f"status={status}"} # Still open — judge whether the latest response satisfies the card. - verdict, reason, _parse_failed = judge_goal(goal_text, last_response) + # The kanban worker loop has no wait-barrier concept (workers finish + # via kanban_complete / kanban_block, not by parking), so a WAIT + # verdict is treated as CONTINUE here. + verdict, reason, _parse_failed, _wait = judge_goal(goal_text, last_response) + if verdict == "wait": + verdict = "continue" _log(f"kanban goal loop: turn {turns_used}/{max_turns} verdict={verdict} reason={_truncate(reason, 120)}") if verdict == "done": diff --git a/tests/cli/test_cli_goal_interrupt.py b/tests/cli/test_cli_goal_interrupt.py index 0ef04149038..6ab4ce89d2c 100644 --- a/tests/cli/test_cli_goal_interrupt.py +++ b/tests/cli/test_cli_goal_interrupt.py @@ -169,7 +169,7 @@ class TestHealthyTurnStillRuns: # Force the judge to say "continue" without touching the network. with patch( "hermes_cli.goals.judge_goal", - return_value=("continue", "needs more steps", False), + return_value=("continue", "needs more steps", False, None), ): cli._maybe_continue_goal_after_turn() @@ -189,7 +189,7 @@ class TestHealthyTurnStillRuns: with patch( "hermes_cli.goals.judge_goal", - return_value=("done", "goal satisfied", False), + return_value=("done", "goal satisfied", False, None), ): cli._maybe_continue_goal_after_turn() diff --git a/tests/gateway/test_goal_verdict_send.py b/tests/gateway/test_goal_verdict_send.py index 14f536aa4f8..535dbe55542 100644 --- a/tests/gateway/test_goal_verdict_send.py +++ b/tests/gateway/test_goal_verdict_send.py @@ -107,7 +107,7 @@ async def test_goal_verdict_done_sent_via_adapter_send(hermes_home): mgr = GoalManager(session_entry.session_id) mgr.set("ship the feature") - with patch("hermes_cli.goals.judge_goal", return_value=("done", "the feature shipped", False)): + with patch("hermes_cli.goals.judge_goal", return_value=("done", "the feature shipped", False, None)): await runner._post_turn_goal_continuation( session_entry=session_entry, source=src, @@ -136,7 +136,7 @@ async def test_goal_verdict_continue_enqueues_continuation(hermes_home): mgr = GoalManager(session_entry.session_id) mgr.set("polish the docs") - with patch("hermes_cli.goals.judge_goal", return_value=("continue", "still needs work", False)): + with patch("hermes_cli.goals.judge_goal", return_value=("continue", "still needs work", False, None)): await runner._post_turn_goal_continuation( session_entry=session_entry, source=src, @@ -164,7 +164,7 @@ async def test_goal_verdict_budget_exhausted_sends_pause(hermes_home): state.turns_used = 2 save_goal(session_entry.session_id, state) - with patch("hermes_cli.goals.judge_goal", return_value=("continue", "keep going", False)): + with patch("hermes_cli.goals.judge_goal", return_value=("continue", "keep going", False, None)): await runner._post_turn_goal_continuation( session_entry=session_entry, source=src, @@ -211,7 +211,7 @@ async def test_goal_verdict_survives_adapter_without_send(hermes_home): runner.adapters[Platform.TELEGRAM] = _NoSendAdapter() - with patch("hermes_cli.goals.judge_goal", return_value=("done", "ok", False)): + with patch("hermes_cli.goals.judge_goal", return_value=("done", "ok", False, None)): # must not raise await runner._post_turn_goal_continuation( session_entry=session_entry, diff --git a/tests/hermes_cli/test_goals.py b/tests/hermes_cli/test_goals.py index 63d00b945ed..2de73e29b9f 100644 --- a/tests/hermes_cli/test_goals.py +++ b/tests/hermes_cli/test_goals.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import time from unittest.mock import patch, MagicMock import pytest @@ -40,23 +41,25 @@ class TestParseJudgeResponse: def test_clean_json_done(self): from hermes_cli.goals import _parse_judge_response - done, reason, _ = _parse_judge_response('{"done": true, "reason": "all good"}') - assert done is True + verdict, reason, _pf, wait = _parse_judge_response('{"done": true, "reason": "all good"}') + assert verdict == "done" assert reason == "all good" + assert wait is None def test_clean_json_continue(self): from hermes_cli.goals import _parse_judge_response - done, reason, _ = _parse_judge_response('{"done": false, "reason": "more work needed"}') - assert done is False + verdict, reason, _pf, wait = _parse_judge_response('{"done": false, "reason": "more work needed"}') + assert verdict == "continue" assert reason == "more work needed" + assert wait is None def test_json_in_markdown_fence(self): from hermes_cli.goals import _parse_judge_response raw = '```json\n{"done": true, "reason": "done"}\n```' - done, reason, _ = _parse_judge_response(raw) - assert done is True + verdict, reason, _pf, _w = _parse_judge_response(raw) + assert verdict == "done" assert "done" in reason def test_json_embedded_in_prose(self): @@ -64,33 +67,79 @@ class TestParseJudgeResponse: from hermes_cli.goals import _parse_judge_response raw = 'Looking at this... the agent says X. Verdict: {"done": false, "reason": "partial"}' - done, reason, _ = _parse_judge_response(raw) - assert done is False + verdict, reason, _pf, _w = _parse_judge_response(raw) + assert verdict == "continue" assert reason == "partial" def test_string_done_values(self): from hermes_cli.goals import _parse_judge_response for s in ("true", "yes", "done", "1"): - done, _, _ = _parse_judge_response(f'{{"done": "{s}", "reason": "r"}}') - assert done is True + verdict, _, _, _ = _parse_judge_response(f'{{"done": "{s}", "reason": "r"}}') + assert verdict == "done" for s in ("false", "no", "not yet"): - done, _, _ = _parse_judge_response(f'{{"done": "{s}", "reason": "r"}}') - assert done is False + verdict, _, _, _ = _parse_judge_response(f'{{"done": "{s}", "reason": "r"}}') + assert verdict == "continue" - def test_malformed_json_fails_open(self): - """Non-JSON → not done, with error-ish reason (so judge_goal can map to continue).""" + def test_new_verdict_shape(self): + """The explicit {"verdict": ...} shape is honored.""" from hermes_cli.goals import _parse_judge_response - done, reason, _ = _parse_judge_response("this is not json at all") - assert done is False + v, _, _, _ = _parse_judge_response('{"verdict": "done", "reason": "r"}') + assert v == "done" + v, _, _, _ = _parse_judge_response('{"verdict": "continue", "reason": "r"}') + assert v == "continue" + + def test_wait_verdict_with_pid(self): + from hermes_cli.goals import _parse_judge_response + + v, reason, pf, wait = _parse_judge_response( + '{"verdict": "wait", "wait_on_pid": 4242, "reason": "CI running"}' + ) + assert v == "wait" + assert pf is False + assert wait == {"pid": 4242} + assert reason == "CI running" + + def test_wait_verdict_with_seconds(self): + from hermes_cli.goals import _parse_judge_response + + v, _, _, wait = _parse_judge_response( + '{"verdict": "wait", "wait_for_seconds": 90, "reason": "rate limited"}' + ) + assert v == "wait" + assert wait == {"seconds": 90} + + def test_wait_verdict_without_target_downgrades_to_continue(self): + """A wait verdict with no pid/seconds can't park on anything → continue.""" + from hermes_cli.goals import _parse_judge_response + + v, _, pf, wait = _parse_judge_response('{"verdict": "wait", "reason": "vague"}') + assert v == "continue" + assert wait is None + assert pf is False + + def test_unknown_verdict_falls_back_to_continue(self): + from hermes_cli.goals import _parse_judge_response + + v, _, _, _ = _parse_judge_response('{"verdict": "maybe", "reason": "r"}') + assert v == "continue" + + def test_malformed_json_fails_open(self): + """Non-JSON → continue + parse_failed, with error-ish reason.""" + from hermes_cli.goals import _parse_judge_response + + verdict, reason, parse_failed, _w = _parse_judge_response("this is not json at all") + assert verdict == "continue" + assert parse_failed is True assert reason # non-empty def test_empty_response(self): from hermes_cli.goals import _parse_judge_response - done, reason, _ = _parse_judge_response("") - assert done is False + verdict, reason, parse_failed, _w = _parse_judge_response("") + assert verdict == "continue" + assert parse_failed is True assert reason @@ -103,13 +152,13 @@ class TestJudgeGoal: def test_empty_goal_skipped(self): from hermes_cli.goals import judge_goal - verdict, _, _ = judge_goal("", "some response") + verdict, _, _, _wd = judge_goal("", "some response") assert verdict == "skipped" def test_empty_response_continues(self): from hermes_cli.goals import judge_goal - verdict, _, _ = judge_goal("ship the thing", "") + verdict, _, _, _wd = judge_goal("ship the thing", "") assert verdict == "continue" def test_no_aux_client_continues(self): @@ -120,7 +169,7 @@ class TestJudgeGoal: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(None, None), ): - verdict, _, _ = goals.judge_goal("my goal", "my response") + verdict, _, _, _wd = goals.judge_goal("my goal", "my response") assert verdict == "continue" def test_api_error_continues(self): @@ -133,7 +182,7 @@ class TestJudgeGoal: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(fake_client, "judge-model"), ): - verdict, reason, _ = goals.judge_goal("goal", "response") + verdict, reason, _, _wd = goals.judge_goal("goal", "response") assert verdict == "continue" assert "judge error" in reason.lower() @@ -152,7 +201,7 @@ class TestJudgeGoal: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(fake_client, "judge-model"), ): - verdict, reason, _ = goals.judge_goal("goal", "agent response") + verdict, reason, _, _wd = goals.judge_goal("goal", "agent response") assert verdict == "done" assert reason == "achieved" @@ -171,7 +220,7 @@ class TestJudgeGoal: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(fake_client, "judge-model"), ): - verdict, reason, _ = goals.judge_goal("goal", "agent response") + verdict, reason, _, _wd = goals.judge_goal("goal", "agent response") assert verdict == "continue" assert reason == "not yet" @@ -260,7 +309,7 @@ class TestGoalManager: mgr = GoalManager(session_id="eval-sid-1") mgr.set("ship it") - with patch.object(goals, "judge_goal", return_value=("done", "shipped", False)): + with patch.object(goals, "judge_goal", return_value=("done", "shipped", False, None)): decision = mgr.evaluate_after_turn("I shipped the feature.") assert decision["verdict"] == "done" @@ -276,7 +325,7 @@ class TestGoalManager: mgr = GoalManager(session_id="eval-sid-2", default_max_turns=5) mgr.set("a long goal") - with patch.object(goals, "judge_goal", return_value=("continue", "more work", False)): + with patch.object(goals, "judge_goal", return_value=("continue", "more work", False, None)): decision = mgr.evaluate_after_turn("made some progress") assert decision["verdict"] == "continue" @@ -294,7 +343,7 @@ class TestGoalManager: mgr = GoalManager(session_id="eval-sid-3", default_max_turns=2) mgr.set("hard goal") - with patch.object(goals, "judge_goal", return_value=("continue", "not yet", False)): + with patch.object(goals, "judge_goal", return_value=("continue", "not yet", False, None)): d1 = mgr.evaluate_after_turn("step 1") assert d1["should_continue"] is True assert mgr.state.turns_used == 1 @@ -371,28 +420,28 @@ class TestJudgeParseFailureAutoPause: def test_parse_response_flags_empty_as_parse_failure(self): from hermes_cli.goals import _parse_judge_response - done, reason, parse_failed = _parse_judge_response("") - assert done is False + verdict, reason, parse_failed, _w = _parse_judge_response("") + assert verdict == "continue" assert parse_failed is True assert "empty" in reason.lower() def test_parse_response_flags_non_json_as_parse_failure(self): from hermes_cli.goals import _parse_judge_response - done, reason, parse_failed = _parse_judge_response( + verdict, reason, parse_failed, _w = _parse_judge_response( "Let me analyze whether the goal is fully satisfied based on the agent's response..." ) - assert done is False + assert verdict == "continue" assert parse_failed is True assert "not json" in reason.lower() def test_parse_response_clean_json_is_not_parse_failure(self): from hermes_cli.goals import _parse_judge_response - done, _, parse_failed = _parse_judge_response( + verdict, _, parse_failed, _w = _parse_judge_response( '{"done": false, "reason": "more work"}' ) - assert done is False + assert verdict == "continue" assert parse_failed is False def test_api_error_does_not_count_as_parse_failure(self): @@ -405,7 +454,7 @@ class TestJudgeParseFailureAutoPause: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(fake_client, "judge-model"), ): - verdict, _, parse_failed = goals.judge_goal("goal", "response") + verdict, _, parse_failed, _wd = goals.judge_goal("goal", "response") assert verdict == "continue" assert parse_failed is False @@ -421,7 +470,7 @@ class TestJudgeParseFailureAutoPause: "agent.auxiliary_client.get_text_auxiliary_client", return_value=(fake_client, "judge-model"), ): - verdict, _, parse_failed = goals.judge_goal("goal", "response") + verdict, _, parse_failed, _wd = goals.judge_goal("goal", "response") assert verdict == "continue" assert parse_failed is True @@ -435,7 +484,7 @@ class TestJudgeParseFailureAutoPause: mgr.set("do a thing") with patch.object( - goals, "judge_goal", return_value=("continue", "judge returned empty response", True) + goals, "judge_goal", return_value=("continue", "judge returned empty response", True, None) ): d1 = mgr.evaluate_after_turn("step 1") assert d1["should_continue"] is True @@ -464,7 +513,7 @@ class TestJudgeParseFailureAutoPause: # Two parse failures… with patch.object( - goals, "judge_goal", return_value=("continue", "not json", True) + goals, "judge_goal", return_value=("continue", "not json", True, None) ): mgr.evaluate_after_turn("step 1") mgr.evaluate_after_turn("step 2") @@ -472,7 +521,7 @@ class TestJudgeParseFailureAutoPause: # …then one clean reply resets the counter. with patch.object( - goals, "judge_goal", return_value=("continue", "making progress", False) + goals, "judge_goal", return_value=("continue", "making progress", False, None) ): d = mgr.evaluate_after_turn("step 3") assert d["should_continue"] is True @@ -487,7 +536,7 @@ class TestJudgeParseFailureAutoPause: mgr.set("goal") with patch.object( - goals, "judge_goal", return_value=("continue", "judge error: RuntimeError", False) + goals, "judge_goal", return_value=("continue", "judge error: RuntimeError", False, None) ): for _ in range(5): d = mgr.evaluate_after_turn("still going") @@ -506,7 +555,7 @@ class TestJudgeParseFailureAutoPause: mgr.set("persistent goal") with patch.object( - goals, "judge_goal", return_value=("continue", "empty", True) + goals, "judge_goal", return_value=("continue", "empty", True, None) ): mgr.evaluate_after_turn("r") mgr.evaluate_after_turn("r") @@ -714,7 +763,7 @@ class TestJudgeGoalWithSubgoals: return_value=(_FakeClient, "fake-model")), \ patch("agent.auxiliary_client.get_auxiliary_extra_body", return_value=None): - verdict, reason, parse_failed = goals.judge_goal( + verdict, reason, parse_failed, _wd = goals.judge_goal( "ship the feature", "ok shipped", subgoals=["write tests", "update docs"], @@ -778,3 +827,395 @@ class TestStatusLineSubgoalCount: mgr.add_subgoal("b") line = mgr.status_line() assert "2 subgoals" in line + + +# ────────────────────────────────────────────────────────────────────── +# Wait barrier — parking the goal loop on a background process +# ────────────────────────────────────────────────────────────────────── + + +class TestWaitBarrier: + """The /goal wait barrier parks the loop on a live PID and resumes when + the process exits, without burning turns or calling the judge.""" + + @staticmethod + def _spawn_sleeper(): + """Start a short-lived child process; return its Popen handle.""" + import subprocess + import sys + return subprocess.Popen([sys.executable, "-c", "import time; time.sleep(30)"]) + + @staticmethod + def _dead_pid(): + """A PID that is essentially guaranteed not to be running.""" + return 2_000_000_000 + + def test_wait_on_requires_active_goal(self, hermes_home): + from hermes_cli.goals import GoalManager + mgr = GoalManager(session_id="wb-noactive") + with pytest.raises(RuntimeError): + mgr.wait_on(12345) + + def test_wait_on_rejects_bad_pid(self, hermes_home): + from hermes_cli.goals import GoalManager + mgr = GoalManager(session_id="wb-badpid") + mgr.set("g") + with pytest.raises(ValueError): + mgr.wait_on(0) + + def test_parked_on_live_pid_does_not_continue_or_judge(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + try: + mgr = GoalManager(session_id="wb-live") + mgr.set("ship it", max_turns=5) + mgr.wait_on(proc.pid, reason="CI green") + assert mgr.is_waiting() is True + + # The judge must NOT be called while parked, and no turn is burned. + judge = MagicMock(return_value=("continue", "x", False, None)) + with patch.object(goals, "judge_goal", judge): + decision = mgr.evaluate_after_turn("still waiting on CI") + + judge.assert_not_called() + assert decision["verdict"] == "waiting" + assert decision["should_continue"] is False + assert decision["continuation_prompt"] is None + assert mgr.state.turns_used == 0 # no turn consumed while parked + assert "CI green" in decision["message"] + assert mgr.state.status == "active" # still active, just parked + finally: + proc.terminate() + proc.wait(timeout=10) + + def test_barrier_auto_clears_when_process_exits_and_loop_resumes(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + mgr = GoalManager(session_id="wb-exit") + mgr.set("ship it", max_turns=5) + mgr.wait_on(proc.pid, reason="build") + assert mgr.is_waiting() is True + + # Kill the process — barrier should auto-clear and judging resumes. + proc.terminate() + proc.wait(timeout=10) + + assert mgr.is_waiting() is False # lazy auto-clear + assert mgr.state.waiting_on_pid is None + + with patch.object(goals, "judge_goal", return_value=("continue", "more", False, None)): + decision = mgr.evaluate_after_turn("process finished, here are results") + + assert decision["verdict"] == "continue" + assert decision["should_continue"] is True + assert mgr.state.turns_used == 1 # now a turn IS consumed + + def test_dead_pid_never_parks(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + mgr = GoalManager(session_id="wb-dead") + mgr.set("g", max_turns=5) + mgr.wait_on(self._dead_pid(), reason="already-dead") + # is_waiting clears the stale barrier immediately. + assert mgr.is_waiting() is False + + with patch.object(goals, "judge_goal", return_value=("continue", "go", False, None)): + decision = mgr.evaluate_after_turn("response") + assert decision["should_continue"] is True + + def test_stop_waiting_clears_barrier(self, hermes_home): + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + try: + mgr = GoalManager(session_id="wb-stop") + mgr.set("g") + mgr.wait_on(proc.pid) + assert mgr.is_waiting() is True + assert mgr.stop_waiting() is True + assert mgr.state.waiting_on_pid is None + assert mgr.is_waiting() is False + assert mgr.stop_waiting() is False # idempotent + finally: + proc.terminate() + proc.wait(timeout=10) + + def test_pause_and_resume_clear_barrier(self, hermes_home): + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + try: + mgr = GoalManager(session_id="wb-pause") + mgr.set("g") + mgr.wait_on(proc.pid) + mgr.pause() + assert mgr.state.waiting_on_pid is None + + mgr.resume() + assert mgr.state.waiting_on_pid is None + finally: + proc.terminate() + proc.wait(timeout=10) + + def test_barrier_persists_and_reloads(self, hermes_home): + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + try: + mgr = GoalManager(session_id="wb-persist") + mgr.set("g") + mgr.wait_on(proc.pid, reason="deploy") + + # Fresh manager loads the persisted barrier. + mgr2 = GoalManager(session_id="wb-persist") + assert mgr2.state.waiting_on_pid == proc.pid + assert mgr2.state.waiting_reason == "deploy" + assert mgr2.is_waiting() is True + finally: + proc.terminate() + proc.wait(timeout=10) + + def test_old_state_row_loads_without_barrier_fields(self, hermes_home): + """Backwards-compat: a state_meta row written before the barrier + existed must load with no barrier.""" + from hermes_cli.goals import GoalState + + legacy = json.dumps({ + "goal": "old goal", + "status": "active", + "turns_used": 2, + "max_turns": 20, + }) + st = GoalState.from_json(legacy) + assert st.goal == "old goal" + assert st.waiting_on_pid is None + assert st.waiting_reason is None + assert st.waiting_since == 0.0 + assert st.waiting_until == 0.0 + + +# ────────────────────────────────────────────────────────────────────── +# Judge-driven auto-wait — the judge parks the loop on its own +# ────────────────────────────────────────────────────────────────────── + + +class TestJudgeDrivenWait: + """The judge returns a `wait` verdict (given live background-process + context) and the loop parks automatically — no manual /goal wait.""" + + @staticmethod + def _spawn_sleeper(): + import subprocess, sys + return subprocess.Popen([sys.executable, "-c", "import time; time.sleep(30)"]) + + def test_judge_wait_pid_parks_loop(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + proc = self._spawn_sleeper() + try: + mgr = GoalManager(session_id="jw-pid", default_max_turns=10) + mgr.set("ship the PR") + # Judge sees the running process and says wait-on-pid. + with patch.object( + goals, "judge_goal", + return_value=("wait", "CI watcher still running", False, {"pid": proc.pid}), + ): + decision = mgr.evaluate_after_turn( + "Pushed the PR, watching CI.", + background_processes=[{ + "pid": proc.pid, "command": "wait_for_pr_green.sh", + "status": "running", "uptime_seconds": 12, + }], + ) + assert decision["verdict"] == "wait" + assert decision["should_continue"] is False + assert decision["continuation_prompt"] is None + assert mgr.state.waiting_on_pid == proc.pid + assert mgr.is_waiting() is True + + # Next turn while still parked: judge must NOT be called again. + judge = MagicMock() + with patch.object(goals, "judge_goal", judge): + d2 = mgr.evaluate_after_turn("still going") + judge.assert_not_called() + assert d2["verdict"] == "waiting" + assert d2["should_continue"] is False + finally: + proc.terminate() + proc.wait(timeout=10) + + def test_judge_wait_seconds_parks_loop(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + mgr = GoalManager(session_id="jw-secs", default_max_turns=10) + mgr.set("retry after backoff") + with patch.object( + goals, "judge_goal", + return_value=("wait", "rate limited", False, {"seconds": 120}), + ): + decision = mgr.evaluate_after_turn("Hit a 429, backing off.") + assert decision["verdict"] == "wait" + assert decision["should_continue"] is False + assert mgr.state.waiting_until > 0 + assert mgr.state.waiting_on_pid is None + assert mgr.is_waiting() is True + + def test_time_barrier_clears_after_deadline(self, hermes_home): + from hermes_cli.goals import GoalManager + + mgr = GoalManager(session_id="jw-deadline") + mgr.set("g") + mgr.wait_for_seconds(120, reason="backoff") + assert mgr.is_waiting() is True + # Force the deadline into the past → barrier auto-clears. + mgr.state.waiting_until = time.time() - 1 + assert mgr.is_waiting() is False + assert mgr.state.waiting_until == 0.0 + + def test_continue_verdict_still_continues_with_background(self, hermes_home): + """A running process present but judge says continue → normal loop.""" + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + mgr = GoalManager(session_id="jw-cont", default_max_turns=10) + mgr.set("do work") + with patch.object( + goals, "judge_goal", + return_value=("continue", "more to do", False, None), + ): + decision = mgr.evaluate_after_turn( + "made progress", + background_processes=[{"pid": 999999, "command": "x", "status": "running"}], + ) + assert decision["verdict"] == "continue" + assert decision["should_continue"] is True + assert mgr.state.waiting_on_pid is None + + +# ────────────────────────────────────────────────────────────────────── +# Session/trigger barrier — wait on a process's OWN trigger, not just exit +# ────────────────────────────────────────────────────────────────────── + + +class TestSessionTriggerBarrier: + """The session barrier (wait_on_session) releases when a process's own + trigger fires — a watch_patterns match mid-run (process may never exit) + OR exit — not only on PID exit. CI-safe: uses synthetic registry session + objects, no real child processes.""" + + @staticmethod + def _inject(sid, *, watch_patterns=None, exited=False): + import time as _t + from tools.process_registry import process_registry, ProcessSession + s = ProcessSession(id=sid, command="watcher.sh", task_id="t", + session_key="", cwd="/tmp", started_at=_t.time()) + if watch_patterns: + s.watch_patterns = list(watch_patterns) + s.exited = exited + if exited: + process_registry._finished[sid] = s + else: + process_registry._running[sid] = s + return s, process_registry + + def test_registry_is_session_waiting_running_unmatched(self, hermes_home): + s, reg = self._inject("proc_t1", watch_patterns=["READY"]) + assert reg.is_session_waiting("proc_t1") is True + + def test_registry_releases_on_watch_match_while_alive(self, hermes_home): + s, reg = self._inject("proc_t2", watch_patterns=["READY"]) + assert reg.is_session_waiting("proc_t2") is True + s._watch_hits = 1 # what _check_watch_patterns sets on a match + # Released even though the process is STILL running (never exited). + assert s.exited is False + assert reg.is_session_waiting("proc_t2") is False + + def test_registry_releases_on_exit_plain_session(self, hermes_home): + s, reg = self._inject("proc_t3") # no watch pattern + assert reg.is_session_waiting("proc_t3") is True + s.exited = True + assert reg.is_session_waiting("proc_t3") is False + + def test_registry_unknown_session_never_waits(self, hermes_home): + from tools.process_registry import process_registry + assert process_registry.is_session_waiting("proc_does_not_exist") is False + + def test_goal_parks_on_session_and_releases_on_trigger(self, hermes_home): + from hermes_cli import goals + from hermes_cli.goals import GoalManager + + s, reg = self._inject("proc_t4", watch_patterns=["BUILD SUCCESSFUL"]) + mgr = GoalManager(session_id="st-goal", default_max_turns=10) + mgr.set("wait for the build to succeed") + with patch.object( + goals, "judge_goal", + return_value=("wait", "blocked on build", False, {"session_id": "proc_t4"}), + ): + decision = mgr.evaluate_after_turn( + "Started the build watcher.", + background_processes=[{ + "session_id": "proc_t4", "pid": 4242, "command": "watcher.sh", + "status": "running", "watch_patterns": ["BUILD SUCCESSFUL"], + "watch_hit": False, + }], + ) + assert decision["verdict"] == "wait" + assert mgr.state.waiting_on_session == "proc_t4" + assert mgr.is_waiting() is True + + # Judge must NOT be called again while parked. + judge = MagicMock() + with patch.object(goals, "judge_goal", judge): + d2 = mgr.evaluate_after_turn("still building") + judge.assert_not_called() + assert d2["should_continue"] is False + + # Trigger fires mid-run (process still alive) → barrier releases. + s._watch_hits = 1 + assert mgr.is_waiting() is False + assert mgr.state.waiting_on_session is None + + # Loop resumes with a real judge verdict. + with patch.object(goals, "judge_goal", + return_value=("continue", "build done", False, None)): + d3 = mgr.evaluate_after_turn("build succeeded") + assert d3["should_continue"] is True + + def test_wait_on_session_validation(self, hermes_home): + from hermes_cli.goals import GoalManager + mgr = GoalManager(session_id="st-val") + # No active goal → RuntimeError + try: + mgr.wait_on_session("proc_x") + assert False, "expected RuntimeError" + except RuntimeError: + pass + mgr.set("g") + try: + mgr.wait_on_session("") + assert False, "expected ValueError" + except ValueError: + pass + + def test_session_directive_parsed_from_judge(self, hermes_home): + from hermes_cli.goals import _parse_judge_response + v, _, pf, wd = _parse_judge_response( + '{"verdict": "wait", "wait_on_session": "proc_abc", "reason": "r"}' + ) + assert v == "wait" + assert pf is False + assert wd == {"session_id": "proc_abc"} + + def test_old_state_loads_without_session_field(self, hermes_home): + from hermes_cli.goals import GoalState + st = GoalState.from_json(json.dumps({ + "goal": "g", "status": "active", "turns_used": 0, "max_turns": 20, + })) + assert st.waiting_on_session is None diff --git a/tests/hermes_cli/test_kanban_goal_mode.py b/tests/hermes_cli/test_kanban_goal_mode.py index e8984a1aa62..da0c2ae168f 100644 --- a/tests/hermes_cli/test_kanban_goal_mode.py +++ b/tests/hermes_cli/test_kanban_goal_mode.py @@ -179,9 +179,10 @@ def _patch_judge(monkeypatch, verdicts): """Make judge_goal return a scripted sequence of verdicts.""" seq = list(verdicts) - def _fake_judge(goal, response, subgoals=None): + def _fake_judge(goal, response, subgoals=None, background_processes=None, **_kw): v = seq.pop(0) if seq else "done" - return v, f"scripted:{v}", False + # 4-tuple contract: (verdict, reason, parse_failed, wait_directive) + return v, f"scripted:{v}", False, None monkeypatch.setattr(goals, "judge_goal", _fake_judge) diff --git a/tools/process_registry.py b/tools/process_registry.py index c067de0136b..1ed658a92f2 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -1055,6 +1055,42 @@ class ProcessRegistry: """Check if a completion notification was already consumed via wait/log.""" return session_id in self._completion_consumed + def is_session_waiting(self, session_id: str) -> bool: + """Whether a goal loop parked on this session should still be parked. + + Used by the goal-loop wait barrier (``hermes_cli.goals``) to support + waiting on a process's OWN trigger, not just its exit. A session is + "still waiting" when: + - it is still running, AND + - if it has ``watch_patterns``, none has matched yet (so a + long-lived watcher that fires a trigger mid-run — and may never + exit — unblocks the moment its pattern hits, not on exit). + + Returns False (don't wait) when the session has exited, its watch + pattern has already fired, or the session is unknown — so a stale or + already-triggered barrier can never wedge the loop. + """ + if not session_id: + return False + with self._lock: + session = self._running.get(session_id) or self._finished.get(session_id) + if session is None: + return False + # Refresh detached/remote state so .exited is current. + try: + self._refresh_detached_session(session) + except Exception: + pass + if session.exited: + return False + # Watch-pattern process: the trigger is a pattern match, not exit. + # Once any match has been delivered, the wait is satisfied even though + # the process keeps running (server/daemon/watcher case). + if session.watch_patterns and not session._watch_disabled: + if session._watch_hits > 0: + return False + return True + def _drain_should_skip(self, session_id: str) -> bool: """Whether the CLI drain should skip a completion event for this session. @@ -1500,6 +1536,14 @@ class ProcessRegistry: "status": "exited" if s.exited else "running", "output_preview": s.output_buffer[-200:] if s.output_buffer else "", } + # Trigger metadata so a goal-loop judge can decide to wait on this + # process's OWN signal (a watch-pattern match or completion), not + # just its exit. A watcher with watch_patterns may never exit. + if s.watch_patterns and not s._watch_disabled: + entry["watch_patterns"] = list(s.watch_patterns) + entry["watch_hit"] = s._watch_hits > 0 + if s.notify_on_complete: + entry["notify_on_complete"] = True if s.exited: entry["exit_code"] = s.exit_code if s.detached: diff --git a/tui_gateway/server.py b/tui_gateway/server.py index c024cc97d89..e8accfa8ba2 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -6716,9 +6716,15 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None: default_max_turns=goal_max_turns, ) if goal_mgr.is_active(): + try: + from hermes_cli.goals import gather_background_processes as _gather_bg + _bg_procs = _gather_bg() + except Exception: + _bg_procs = None decision = goal_mgr.evaluate_after_turn( raw, user_initiated=True, + background_processes=_bg_procs, ) verdict_msg = decision.get("message") or "" if verdict_msg: diff --git a/website/docs/user-guide/features/goals.md b/website/docs/user-guide/features/goals.md index d5302a93068..8e1f4504e33 100644 --- a/website/docs/user-guide/features/goals.md +++ b/website/docs/user-guide/features/goals.md @@ -44,6 +44,8 @@ What you'll see: | `/goal pause` | Stop the auto-continuation loop without clearing the goal. | | `/goal resume` | Resume the loop (resets the turn counter back to zero). | | `/goal clear` | Drop the goal entirely. | +| `/goal wait [reason]` | Park the loop on a background process — it stops re-poking the agent every turn while the process runs, and auto-resumes when it exits. | +| `/goal unwait` | Drop the wait barrier and resume the loop immediately. | Works identically on the CLI and every gateway platform (Telegram, Discord, Slack, Matrix, Signal, WhatsApp, SMS, iMessage, Webhook, API server, and the web dashboard). @@ -62,6 +64,29 @@ Subgoals are persisted alongside the goal in `SessionDB.state_meta`, so they sur Use this when you start a loop ("fix the failing tests") and notice partway through that you also want it to "and add a regression test for the bug you just patched" — `/subgoal add a regression test` tightens the success criteria without breaking the running loop. +## Parking on a background process: automatic, with a manual override + +Some goals are gated on something that takes minutes and runs on its own — CI on a pushed PR, a long build, a test matrix, a deploy, a rate-limit cooldown. Without help, the goal loop would re-poke the agent every turn into "is it done yet?" busy-work while it waits. + +**This is handled automatically.** Every turn, the judge is shown the agent's live background processes (the `terminal(background=true)` registry — pid, session id, command, uptime, recent output, and any `watch_patterns` / `notify_on_complete` trigger) alongside the goal and the agent's response. When the agent's progress is genuinely gated on one of them, the judge returns a **`wait`** verdict instead of `continue`, and the loop **parks**: the next turns are skipped (no judge call, no continuation, no turn consumed) until the wait is satisfied — then it resumes normally with the result in hand. The judge can also park on a **time** basis (`wait_for_seconds`) for backoff/cooldown waits. `/goal status` shows `⏳ Goal (parked …)` while parked. + +The judge picks the right kind of wait from the process's own signal: + +- **`wait_on_session `** — releases when the process's *own trigger* fires: it exits, **or** (if it was started with `watch_patterns`) its pattern matches. This is the one for a long-lived watcher / server / poller that signals **mid-run** (e.g. a build process that prints `BUILD SUCCESSFUL` and keeps running, or a `notify_on_complete` watcher) and may never exit on its own. +- **`wait_on_pid `** — releases on process exit only. +- **`wait_for_seconds `** — releases after a fixed delay. + +You don't type anything for this — it's the judge's decision, made from the process context the loop hands it. The manual commands exist as an override: + +| Command | What it does | +|---|---| +| `/goal wait [reason]` | Manually park the loop until the process with that PID exits. | +| `/goal unwait` | Clear any wait barrier (judge- or manually-set) and resume immediately. | + +The barrier (pid- or time-based) is persisted with the goal in `SessionDB.state_meta`, so it survives `/resume`. `/goal pause`, `/goal resume`, and `/goal clear` all drop it. If the PID is already dead when the barrier is set (or dies while parked), or the time deadline passes, the barrier clears on the next check — a stale barrier can never wedge the loop. + +Typical flow: the agent pushes a PR, starts a CI watcher with `terminal(background=true, notify_on_complete=true)`, and reports "watching CI." The judge sees the watcher process still running, returns `wait` on its pid, and the loop goes quiet — then picks back up the instant CI finishes and judges the goal against the actual result. + ## Behavior details ### The judge @@ -94,7 +119,7 @@ Any real message you send while a goal is active takes priority over the continu ### Mid-run safety (gateway) -While an agent is already running, `/goal status`, `/goal pause`, and `/goal clear` are safe to run — they only touch control-plane state and don't interrupt the current turn. Setting a **new** goal mid-run (`/goal `) is rejected with a message telling you to `/stop` first, so the old continuation can't race the new one. +While an agent is already running, `/goal status`, `/goal pause`, `/goal clear`, `/goal wait`, and `/goal unwait` are safe to run — they only touch control-plane state and don't interrupt the current turn. Setting a **new** goal mid-run (`/goal `) is rejected with a message telling you to `/stop` first, so the old continuation can't race the new one. ### Persistence