diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 575c90e32d..f526215094 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -573,9 +573,18 @@ class Task: tenant: Optional[str] result: Optional[str] = None idempotency_key: Optional[str] = None - spawn_failures: int = 0 + # Unified non-success counter. Incremented on any of: + # * spawn failure (dispatcher couldn't launch the worker) + # * timed_out outcome (worker exceeded max_runtime_seconds) + # * crashed outcome (worker PID vanished) + # Reset to 0 only on a successful completion. See + # ``_record_task_failure`` for the circuit-breaker trip rule. + # (Pre-rename column: ``spawn_failures``.) + consecutive_failures: int = 0 worker_pid: Optional[int] = None - last_spawn_error: Optional[str] = None + # Short excerpt of the last failure's error text (any outcome, not + # just spawn). Pre-rename column: ``last_spawn_error``. + last_failure_error: Optional[str] = None max_runtime_seconds: Optional[int] = None last_heartbeat_at: Optional[int] = None current_run_id: Optional[int] = None @@ -617,9 +626,15 @@ class Task: tenant=row["tenant"] if "tenant" in keys else None, result=row["result"] if "result" in keys else None, idempotency_key=row["idempotency_key"] if "idempotency_key" in keys else None, - spawn_failures=row["spawn_failures"] if "spawn_failures" in keys else 0, + consecutive_failures=( + row["consecutive_failures"] if "consecutive_failures" in keys + else (row["spawn_failures"] if "spawn_failures" in keys else 0) + ), worker_pid=row["worker_pid"] if "worker_pid" in keys else None, - last_spawn_error=row["last_spawn_error"] if "last_spawn_error" in keys else None, + last_failure_error=( + row["last_failure_error"] if "last_failure_error" in keys + else (row["last_spawn_error"] if "last_spawn_error" in keys else None) + ), max_runtime_seconds=( row["max_runtime_seconds"] if "max_runtime_seconds" in keys else None ), @@ -735,9 +750,14 @@ CREATE TABLE IF NOT EXISTS tasks ( tenant TEXT, result TEXT, idempotency_key TEXT, - spawn_failures INTEGER NOT NULL DEFAULT 0, + -- Unified consecutive-failure counter. Incremented on spawn + -- failure, timeout, or crash; reset only on successful completion. + -- The circuit breaker in _record_task_failure trips when this + -- exceeds DEFAULT_FAILURE_LIMIT consecutive non-successes. + consecutive_failures INTEGER NOT NULL DEFAULT 0, worker_pid INTEGER, - last_spawn_error TEXT, + -- Short excerpt of the most recent failure's error text. + last_failure_error TEXT, max_runtime_seconds INTEGER, last_heartbeat_at INTEGER, -- Pointer into task_runs for the currently-active run (NULL if no @@ -933,14 +953,31 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: "CREATE INDEX IF NOT EXISTS idx_tasks_idempotency " "ON tasks(idempotency_key)" ) - if "spawn_failures" not in cols: - conn.execute( - "ALTER TABLE tasks ADD COLUMN spawn_failures INTEGER NOT NULL DEFAULT 0" - ) + # Legacy column rename: ``spawn_failures`` → ``consecutive_failures`` + # and ``last_spawn_error`` → ``last_failure_error``. The counter was + # originally spawn-only; it's now unified across spawn/timeout/ + # crash outcomes. Rename when only the legacy columns exist to + # preserve historical counter values across upgrades. Add fresh + # otherwise. + if "consecutive_failures" not in cols: + if "spawn_failures" in cols: + conn.execute( + "ALTER TABLE tasks RENAME COLUMN spawn_failures TO consecutive_failures" + ) + else: + conn.execute( + "ALTER TABLE tasks ADD COLUMN consecutive_failures " + "INTEGER NOT NULL DEFAULT 0" + ) if "worker_pid" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER") - if "last_spawn_error" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN last_spawn_error TEXT") + if "last_failure_error" not in cols: + if "last_spawn_error" in cols: + conn.execute( + "ALTER TABLE tasks RENAME COLUMN last_spawn_error TO last_failure_error" + ) + else: + conn.execute("ALTER TABLE tasks ADD COLUMN last_failure_error TEXT") if "max_runtime_seconds" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER") if "last_heartbeat_at" not in cols: @@ -1895,6 +1932,11 @@ def reclaim_task( }, run_id=run_id, ) + # Operator intervention — they've looked at the task, so the + # consecutive-failures counter is now stale. Give the next retry + # a fresh budget. (_clear_failure_counter opens its own write_txn, + # so it runs after the enclosing one commits.) + _clear_failure_counter(conn, task_id) return True @@ -2186,6 +2228,11 @@ def complete_task( }, run_id=run_id, ) + # Successful completion — wipe the consecutive-failures counter. + # Failure history stays on the event log for audit; the counter + # just tracks "is there a current pathology the breaker should + # care about", and a success resets that question. + _clear_failure_counter(conn, task_id) # Recompute ready status for dependents (separate txn so children see done). recompute_ready(conn) return True @@ -2444,7 +2491,9 @@ def set_workspace_path( # stops retrying and parks the task in ``blocked`` with a reason so a human # can investigate. Prevents the dispatcher from thrashing forever on a task # whose profile doesn't exist, whose workspace is unmountable, etc. -DEFAULT_SPAWN_FAILURE_LIMIT = 5 +DEFAULT_FAILURE_LIMIT = 5 +# Legacy alias — callers / tests still reference the old name. +DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT # Max bytes to keep in a single worker log file. The dispatcher truncates # and rotates on spawn if the file is larger than this at spawn time. @@ -2668,6 +2717,20 @@ def enforce_max_runtime( conn, tid, "timed_out", payload, run_id=run_id, ) timed_out.append(tid) + # Increment the unified failure counter. Outside the write_txn + # above because ``_record_task_failure`` opens its own. If the + # breaker trips, this flips the task ``ready → blocked`` and + # emits a ``gave_up`` event on top of the ``timed_out`` we + # already emitted. + if cur.rowcount == 1: + _record_task_failure( + conn, tid, + error=f"elapsed {int(elapsed)}s > limit {int(row['max_runtime_seconds'])}s", + outcome="timed_out", + release_claim=False, + end_run=False, + event_payload_extra={"pid": pid, "sigkill": killed}, + ) return timed_out @@ -2699,6 +2762,10 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: dispatcher (the whole design is single-host). """ crashed: list[str] = [] + # Per-crash details collected inside the main txn, used after it + # closes to run ``_record_task_failure`` (which needs its own + # write_txn so can't nest). + crash_details: list[tuple[str, int, str]] = [] # (task_id, pid, claimer) with write_txn(conn): rows = conn.execute( "SELECT id, worker_pid, claim_lock FROM tasks " @@ -2734,67 +2801,169 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: run_id=run_id, ) crashed.append(row["id"]) + crash_details.append( + (row["id"], int(row["worker_pid"]), row["claim_lock"]) + ) + # Outside the main txn: increment the unified failure counter for + # each crashed task. If the breaker trips, the task transitions + # ready → blocked with a ``gave_up`` event on top of the ``crashed`` + # event we already emitted. + for tid, pid, claimer in crash_details: + _record_task_failure( + conn, tid, + error=f"pid {pid} not alive", + outcome="crashed", + release_claim=False, + end_run=False, + event_payload_extra={"pid": pid, "claimer": claimer}, + ) return crashed +def _record_task_failure( + conn: sqlite3.Connection, + task_id: str, + error: str, + *, + outcome: str, + failure_limit: int = None, + release_claim: bool = False, + end_run: bool = False, + event_payload_extra: Optional[dict] = None, +) -> bool: + """Record a non-success outcome (spawn_failed / crashed / timed_out) + and maybe trip the circuit breaker. + + Unified replacement for the old spawn-only ``_record_spawn_failure``. + Every path that ends a task with a non-success outcome funnels + through here so the ``consecutive_failures`` counter and the + auto-block threshold stay consistent. + + Returns True when the task was auto-blocked (counter reached + ``failure_limit``), False when it was just updated in place. + + Modes: + + * ``release_claim=True, end_run=True`` — spawn-failure path. + Caller has a running task with an open run; this transitions + it back to ``ready`` (or ``blocked`` when the breaker trips), + releases the claim, and closes the run with ``outcome=``. + + * ``release_claim=False, end_run=False`` — timeout/crash path. + Caller has ALREADY flipped the task to ``ready`` and closed the + run with the appropriate outcome. This just increments the + counter; if the breaker trips, the task is re-transitioned + ``ready → blocked`` and a ``gave_up`` event is emitted. + + ``event_payload_extra`` merges into the ``gave_up`` event payload + when the breaker trips, so callers can include outcome-specific + context (e.g. pid on crash, elapsed on timeout). + """ + if failure_limit is None: + failure_limit = DEFAULT_FAILURE_LIMIT + blocked = False + with write_txn(conn): + row = conn.execute( + "SELECT consecutive_failures, status FROM tasks WHERE id = ?", (task_id,), + ).fetchone() + if row is None: + return False + failures = int(row["consecutive_failures"]) + 1 + cur_status = row["status"] + + if failures >= failure_limit: + # Trip the breaker. + if release_claim: + # Spawn path: still running, also clear claim state. + conn.execute( + "UPDATE tasks SET status = 'blocked', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "consecutive_failures = ?, last_failure_error = ? " + "WHERE id = ? AND status IN ('running', 'ready')", + (failures, error[:500], task_id), + ) + else: + # Timeout/crash path: task is already at ``ready`` + # with claim cleared; just flip to blocked + update + # counter fields. + conn.execute( + "UPDATE tasks SET status = 'blocked', " + "consecutive_failures = ?, last_failure_error = ? " + "WHERE id = ? AND status IN ('ready', 'running')", + (failures, error[:500], task_id), + ) + run_id = None + if end_run: + # Only the spawn path has an open run to close. + run_id = _end_run( + conn, task_id, + outcome="gave_up", status="gave_up", + error=error[:500], + metadata={"failures": failures, "trigger_outcome": outcome}, + ) + payload = { + "failures": failures, + "error": error[:500], + "trigger_outcome": outcome, + } + if event_payload_extra: + payload.update(event_payload_extra) + _append_event( + conn, task_id, "gave_up", payload, run_id=run_id, + ) + blocked = True + else: + # Below threshold. + if release_claim: + # Spawn path: transition running → ready + clear claim. + conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "consecutive_failures = ?, last_failure_error = ? " + "WHERE id = ? AND status = 'running'", + (failures, error[:500], task_id), + ) + else: + # Timeout/crash path: task is already at ``ready`` via + # its own UPDATE. Just bookkeep the counter + last error. + conn.execute( + "UPDATE tasks SET consecutive_failures = ?, " + "last_failure_error = ? WHERE id = ?", + (failures, error[:500], task_id), + ) + if end_run: + # Spawn path: close the open run with outcome. + run_id = _end_run( + conn, task_id, + outcome=outcome, status=outcome, + error=error[:500], + metadata={"failures": failures}, + ) + _append_event( + conn, task_id, outcome, + {"error": error[:500], "failures": failures}, + run_id=run_id, + ) + # Timeout/crash path's caller already emitted its own event. + return blocked + + +# Backward-compat alias. Old name is referenced from tests and possibly +# third-party callers. New code should call ``_record_task_failure``. def _record_spawn_failure( conn: sqlite3.Connection, task_id: str, error: str, *, - failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, + failure_limit: int = None, ) -> bool: - """Release the claim, increment the failure counter, maybe auto-block. - - Returns True when the task was auto-blocked (N failures exceeded), - False when it was just released back to ``ready`` for another try. - """ - blocked = False - with write_txn(conn): - row = conn.execute( - "SELECT spawn_failures FROM tasks WHERE id = ?", (task_id,), - ).fetchone() - failures = int(row["spawn_failures"]) + 1 if row else 1 - if failures >= failure_limit: - conn.execute( - "UPDATE tasks SET status = 'blocked', claim_lock = NULL, " - "claim_expires = NULL, worker_pid = NULL, " - "spawn_failures = ?, last_spawn_error = ? " - "WHERE id = ? AND status IN ('running', 'ready')", - (failures, error[:500], task_id), - ) - run_id = _end_run( - conn, task_id, - outcome="gave_up", status="gave_up", - error=error[:500], - metadata={"failures": failures}, - ) - _append_event( - conn, task_id, "gave_up", - {"failures": failures, "error": error[:500]}, - run_id=run_id, - ) - blocked = True - else: - conn.execute( - "UPDATE tasks SET status = 'ready', claim_lock = NULL, " - "claim_expires = NULL, worker_pid = NULL, " - "spawn_failures = ?, last_spawn_error = ? " - "WHERE id = ? AND status = 'running'", - (failures, error[:500], task_id), - ) - run_id = _end_run( - conn, task_id, - outcome="spawn_failed", status="spawn_failed", - error=error[:500], - metadata={"failures": failures}, - ) - _append_event( - conn, task_id, "spawn_failed", - {"error": error[:500], "failures": failures}, - run_id=run_id, - ) - return blocked + return _record_task_failure( + conn, task_id, error, + outcome="spawn_failed", + failure_limit=failure_limit, + release_claim=True, + end_run=True, + ) def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None: @@ -2818,16 +2987,28 @@ def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None: _append_event(conn, task_id, "spawned", {"pid": int(pid)}, run_id=run_id) -def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None: - """Reset the failure counter after a successful spawn.""" +def _clear_failure_counter(conn: sqlite3.Connection, task_id: str) -> None: + """Reset the unified consecutive-failures counter. + + Called from ``complete_task`` on successful completion — a fresh + success means the task + profile combination is working and any + past failures are history. NOT called on spawn success anymore: + a successful spawn proves the worker could start but says nothing + about whether the run will succeed, so we need to let timeouts and + crashes accumulate across spawn boundaries. + """ with write_txn(conn): conn.execute( - "UPDATE tasks SET spawn_failures = 0, last_spawn_error = NULL " - "WHERE id = ?", + "UPDATE tasks SET consecutive_failures = 0, " + "last_failure_error = NULL WHERE id = ?", (task_id,), ) +# Legacy alias for test-code and anything else that still imports it. +_clear_spawn_failures = _clear_failure_counter + + def has_spawnable_ready(conn: sqlite3.Connection) -> bool: """Return True iff there is at least one ready+assigned+unclaimed task whose assignee maps to a real Hermes profile. @@ -2964,7 +3145,13 @@ def dispatch_once( pid = _spawn(claimed, str(workspace)) if pid: _set_worker_pid(conn, claimed.id, int(pid)) - _clear_spawn_failures(conn, claimed.id) + # NOTE: we intentionally do NOT reset consecutive_failures + # here. A successful spawn proves the worker can start but + # doesn't prove the run will succeed. Under unified + # failure counting, resetting on spawn would let a task + # that keeps timing out after spawn loop forever. The + # counter is cleared only on successful completion (see + # complete_task). result.spawned.append((claimed.id, claimed.assignee or "", str(workspace))) spawned += 1 except Exception as exc: diff --git a/hermes_cli/kanban_diagnostics.py b/hermes_cli/kanban_diagnostics.py index 5a08ee6df5..d2ba26cb83 100644 --- a/hermes_cli/kanban_diagnostics.py +++ b/hermes_cli/kanban_diagnostics.py @@ -312,21 +312,57 @@ def _rule_prose_phantom_refs(task, events, runs, now, cfg) -> list[Diagnostic]: )] -def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnostic]: - """Task's ``spawn_failures`` counter is climbing — worker can't - even start. Usually a profile misconfiguration (missing config.yaml, - bad PATH/venv, wrong credentials). +def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]: + """Task's unified ``consecutive_failures`` counter is climbing — + something about this task+profile combo is broken and each retry + fails the same way. Triggers regardless of the specific failure + mode (spawn error, timeout, crash) because operationally they + all look the same: the kernel keeps retrying and the operator + needs to intervene. - Threshold: cfg["spawn_failure_threshold"] (default 3). + Threshold: cfg["failure_threshold"] (default 3). A threshold of 3 + is one below the circuit-breaker's default (5), so the diagnostic + surfaces BEFORE the breaker trips — giving operators a window to + fix the problem while the dispatcher's still retrying. + + Accepts the legacy ``spawn_failure_threshold`` config key for + back-compat. """ - threshold = int(cfg.get("spawn_failure_threshold", 3)) - failures = _task_field(task, "spawn_failures", 0) + threshold = int(cfg.get( + "failure_threshold", + cfg.get("spawn_failure_threshold", 3), + )) + # Read the new unified counter name, with a fallback to the legacy + # column name so this rule keeps working against old DB rows the + # caller somehow materialised without running the migration. + failures = ( + _task_field(task, "consecutive_failures", None) + if _task_field(task, "consecutive_failures", None) is not None + else _task_field(task, "spawn_failures", 0) + ) if failures is None or failures < threshold: return [] - last_err = _task_field(task, "last_spawn_error") + last_err = ( + _task_field(task, "last_failure_error", None) + if _task_field(task, "last_failure_error", None) is not None + else _task_field(task, "last_spawn_error", None) + ) assignee = _task_field(task, "assignee") + + # Classify the most recent failure by peeking at run outcomes so + # the title + suggested action can be specific without a separate + # per-outcome rule. + ordered_runs = sorted(runs, key=lambda r: _task_field(r, "id", 0)) + most_recent_outcome = None + for r in reversed(ordered_runs): + oc = _task_field(r, "outcome") + if oc in ("spawn_failed", "timed_out", "crashed"): + most_recent_outcome = oc + break + actions: list[DiagnosticAction] = [] - if assignee and assignee != "default": + if most_recent_outcome == "spawn_failed" and assignee and assignee != "default": + # Spawn is failing specifically — profile setup issue. actions.append(DiagnosticAction( kind="cli_hint", label=f"Verify profile: hermes -p {assignee} doctor", @@ -338,28 +374,49 @@ def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnost label=f"Fix profile auth: hermes -p {assignee} auth", payload={"command": f"hermes -p {assignee} auth"}, )) - actions.extend(_generic_recovery_actions(task, running=False)) + elif most_recent_outcome in ("timed_out", "crashed"): + # Worker got off the ground but died. Logs are the right place + # to diagnose; reclaim/reassign are the recovery levers. + task_id = _task_field(task, "id") + if task_id: + actions.append(DiagnosticAction( + kind="cli_hint", + label=f"Check logs: hermes kanban log {task_id}", + payload={"command": f"hermes kanban log {task_id}"}, + suggested=True, + )) + actions.extend(_generic_recovery_actions( + task, running=_task_field(task, "status") == "running", + )) + severity = "critical" if failures >= threshold * 2 else "error" err_text = (last_err or "").strip() if last_err else "" err_snippet = err_text[:500] + ("…" if len(err_text) > 500 else "") if err_text else "" + outcome_label = { + "spawn_failed": "spawn", + "timed_out": "timeout", + "crashed": "crash", + }.get(most_recent_outcome or "", "failure") if err_snippet: - title = f"Agent spawn failed {failures}x: {err_snippet.splitlines()[0][:160]}" + title = f"Agent {outcome_label} x{failures}: {err_snippet.splitlines()[0][:160]}" detail = ( - f"The dispatcher tried to launch a worker {failures} times " - f"and failed every time. Full last error:\n\n{err_snippet}\n\n" - f"Common causes: missing config.yaml, bad venv/PATH, or " - f"missing credentials for the profile's configured provider." + f"This task has failed {failures} times in a row " + f"(most recent: {outcome_label}). Full last error:\n\n" + f"{err_snippet}\n\n" + f"The dispatcher will keep retrying until the consecutive-" + f"failures counter trips the circuit breaker (default 5), " + f"at which point the task auto-blocks. Fix the root cause " + f"and reclaim to retry." ) else: - title = f"Agent spawn failed {failures}x (no error recorded)" + title = f"Agent {outcome_label} x{failures} (no error recorded)" detail = ( - f"The dispatcher tried to launch a worker {failures} times " - f"and failed every time, but no error text was captured. " - f"Usually a profile configuration issue — check profile " - f"health with the suggested command." + f"This task has failed {failures} times in a row " + f"(most recent: {outcome_label}) but no error text was " + f"captured. Check the suggested command or the worker log." ) return [Diagnostic( - kind="repeated_spawn_failures", + kind="repeated_failures", severity=severity, title=title, detail=detail, @@ -367,7 +424,11 @@ def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnost first_seen_at=now, last_seen_at=now, count=failures, - data={"spawn_failures": failures, "last_spawn_error": last_err}, + data={ + "consecutive_failures": failures, + "most_recent_outcome": most_recent_outcome, + "last_error": last_err, + }, )] @@ -378,7 +439,23 @@ def _rule_repeated_crashes(task, events, runs, now, cfg) -> list[Diagnostic]: broken (OOM, missing dependency, tool it needs is down). Threshold: cfg["crash_threshold"] (default 2). + + Narrower than ``repeated_failures`` — fires earlier (2 crashes vs 3 + total failures) so the operator gets a crash-specific heads-up + before the unified rule kicks in. Suppresses itself when the + unified rule is also about to fire, to avoid double-flagging. """ + failure_threshold = int(cfg.get( + "failure_threshold", + cfg.get("spawn_failure_threshold", 3), + )) + unified_counter = ( + _task_field(task, "consecutive_failures", 0) or 0 + ) + # Unified rule will catch this — let it handle to avoid double fire. + if unified_counter >= failure_threshold: + return [] + threshold = int(cfg.get("crash_threshold", 2)) ordered = sorted(runs, key=lambda r: _task_field(r, "id", 0)) # Count trailing consecutive 'crashed' outcomes. @@ -498,7 +575,7 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]: _RULES: list[RuleFn] = [ _rule_hallucinated_cards, _rule_prose_phantom_refs, - _rule_repeated_spawn_failures, + _rule_repeated_failures, _rule_repeated_crashes, _rule_stuck_in_blocked, ] @@ -509,13 +586,15 @@ _RULES: list[RuleFn] = [ DIAGNOSTIC_KINDS = ( "hallucinated_cards", "prose_phantom_refs", - "repeated_spawn_failures", + "repeated_failures", "repeated_crashes", "stuck_in_blocked", ) DEFAULT_CONFIG = { + "failure_threshold": 3, + # Legacy alias accepted at read time by _rule_repeated_failures. "spawn_failure_threshold": 3, "crash_threshold": 2, "blocked_stale_hours": 24, diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 01d623239b..86536596e6 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -96,7 +96,7 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna assert tid not in res.auto_blocked task = kb.get_task(conn, tid) assert task.status == "ready" - assert task.spawn_failures == 3 + assert task.consecutive_failures == 3 # Two more ticks → fifth failure exceeds the limit. res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) @@ -105,15 +105,20 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna assert tid in res2.auto_blocked task = kb.get_task(conn, tid) assert task.status == "blocked" - assert task.spawn_failures >= 5 - assert task.last_spawn_error and "no PATH" in task.last_spawn_error + assert task.consecutive_failures >= 5 + assert task.last_failure_error and "no PATH" in task.last_failure_error finally: conn.close() -def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spawnable): - """A successful spawn clears the counter so past failures don't count - against future retries of the same task.""" +def test_successful_spawn_does_not_reset_failure_counter(kanban_home, all_assignees_spawnable): + """Under unified consecutive-failure counting, a successful spawn + does NOT reset the counter — past failures stay on the books until + a successful completion. This is by design: it prevents a task + that keeps timing out after spawn from looping forever. + (Pre-unification behaviour was to reset on spawn success; see the + complete_task reset for the replacement point.) + """ calls = [0] def _flaky_spawn(task, ws): calls[0] += 1 @@ -128,11 +133,12 @@ def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spaw kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) task = kb.get_task(conn, tid) - assert task.spawn_failures == 2 + assert task.consecutive_failures == 2 kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) task = kb.get_task(conn, tid) - assert task.spawn_failures == 0 - assert task.last_spawn_error is None + # Counter STAYS at 2 — spawn succeeded but run isn't complete yet. + assert task.consecutive_failures == 2 + assert task.last_failure_error is not None # Task is now running with a pid. assert task.status == "running" assert task.worker_pid == 99999 @@ -140,6 +146,30 @@ def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spaw conn.close() +def test_successful_completion_resets_failure_counter(kanban_home, all_assignees_spawnable): + """A successful kb.complete_task wipes the counter — the task+profile + combination proved it can succeed, so past failures are history.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + # Simulate 2 prior failures on the record. + kb.write_txn_ctx = kb.write_txn + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET consecutive_failures = 2, " + "last_failure_error = 'old failure' WHERE id = ?", + (tid,), + ) + # Complete the task. + ok = kb.complete_task(conn, tid, summary="done") + assert ok + task = kb.get_task(conn, tid) + assert task.consecutive_failures == 0 + assert task.last_failure_error is None + finally: + conn.close() + + def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable): """`dir:` workspace with no path should fail workspace resolution AND count against the failure budget — not just crash the tick.""" @@ -158,9 +188,9 @@ def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spa ) res = kb.dispatch_once(conn, failure_limit=3) task = kb.get_task(conn, tid) - assert task.spawn_failures == 1 + assert task.consecutive_failures == 1 assert task.status == "ready" - assert task.last_spawn_error and "workspace" in task.last_spawn_error + assert task.last_failure_error and "workspace" in task.last_failure_error # Run twice more → auto-blocked. kb.dispatch_once(conn, failure_limit=3) res = kb.dispatch_once(conn, failure_limit=3) @@ -3052,3 +3082,184 @@ def test_reassign_task_with_reclaim_first_switches_profile(kanban_home): assert row["status"] == "ready" finally: conn.close() + + +# --------------------------------------------------------------------------- +# Unified failure counter — timeout + crash paths increment the same counter +# as spawn failures, and the circuit breaker trips after N consecutive +# failures regardless of which outcome caused them. +# --------------------------------------------------------------------------- + +def test_enforce_max_runtime_increments_consecutive_failures(kanban_home, monkeypatch): + """A single timeout increments consecutive_failures by 1 (was the + infinite-respawn gap before unification).""" + import hermes_cli.kanban_db as _kb + state = {"sent_term": False} + def _alive(pid): + return not state["sent_term"] + def _signal(pid, sig): + import signal as _sig + if sig == _sig.SIGTERM: + state["sent_term"] = True + monkeypatch.setattr(_kb, "_pid_alive", _alive) + + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="overrun", assignee="worker", + max_runtime_seconds=1, + ) + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, os.getpid()) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", + (int(time.time()) - 30, tid), + ) + before = kb.get_task(conn, tid) + assert before.consecutive_failures == 0 + + kb.enforce_max_runtime(conn, signal_fn=_signal) + + after = kb.get_task(conn, tid) + assert after.consecutive_failures == 1 + assert "elapsed" in (after.last_failure_error or "") + # Task status flipped back to ready (not yet past threshold). + assert after.status == "ready" + finally: + conn.close() + + +def test_repeated_timeouts_trip_the_circuit_breaker(kanban_home, monkeypatch): + """N consecutive timeouts with the unified counter should eventually + hit the failure_limit threshold and auto-block the task. This closes + the Forbidden-Seeds-reported gap where timeout loops never capped. + """ + import hermes_cli.kanban_db as _kb + state = {"sent_term": False} + def _alive(pid): + return not state["sent_term"] + def _signal(pid, sig): + import signal as _sig + if sig == _sig.SIGTERM: + state["sent_term"] = True + monkeypatch.setattr(_kb, "_pid_alive", _alive) + + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="loop forever", assignee="slow-worker", + max_runtime_seconds=1, + ) + # Drop the failure_limit to 3 so we don't need 5 timeouts. + # This uses the module-level DEFAULT; we simulate by calling + # _record_task_failure directly with a tight limit. + for _ in range(3): + # Fresh claim + "started long ago" each iteration. + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET status='running', claim_lock=?, " + "claim_expires=?, worker_pid=?, started_at=? " + "WHERE id=?", + ( + f"{_kb._claimer_id().split(':', 1)[0]}:lock", + int(time.time()) + 3600, + os.getpid(), + int(time.time()) - 30, + tid, + ), + ) + conn.execute( + "INSERT INTO task_runs (task_id, status, claim_lock, " + "claim_expires, worker_pid, started_at) " + "VALUES (?, 'running', ?, ?, ?, ?)", + ( + tid, + f"{_kb._claimer_id().split(':', 1)[0]}:lock", + int(time.time()) + 3600, + os.getpid(), + int(time.time()) - 30, + ), + ) + rid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + conn.execute( + "UPDATE tasks SET current_run_id=? WHERE id=?", + (rid, tid), + ) + state["sent_term"] = False + # Lower the threshold by monkeypatching the default. + monkeypatch.setattr(_kb, "DEFAULT_FAILURE_LIMIT", 3) + kb.enforce_max_runtime(conn, signal_fn=_signal) + + final = kb.get_task(conn, tid) + # After 3 consecutive timeouts with failure_limit=3, task should + # be auto-blocked, not looping forever as ``ready``. + assert final.status == "blocked", \ + f"expected blocked after 3 timeouts, got {final.status}" + assert final.consecutive_failures >= 3 + # ``gave_up`` event emitted (plus 3 ``timed_out`` events). + kinds = [ + r["kind"] for r in conn.execute( + "SELECT kind FROM task_events WHERE task_id=? ORDER BY id", + (tid,), + ) + ] + assert kinds.count("timed_out") >= 3 + assert "gave_up" in kinds + finally: + conn.close() + + +def test_detect_crashed_workers_increments_counter(kanban_home): + """A single crash increments the consecutive_failures counter.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="crashy", assignee="worker") + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, 99999) # fake pid — not alive + + kb.detect_crashed_workers(conn) + + task = kb.get_task(conn, tid) + assert task.consecutive_failures == 1 + assert task.status == "ready" + finally: + conn.close() + + +def test_reclaim_task_clears_failure_counter(kanban_home): + """Operator reclaim wipes the counter so the next retry gets a fresh + budget.""" + import secrets + conn = kb.connect() + try: + tid = kb.create_task(conn, title="stuck", assignee="worker") + lock = secrets.token_hex(4) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET status='running', claim_lock=?, " + "claim_expires=?, worker_pid=?, consecutive_failures=4, " + "last_failure_error='prior issue' WHERE id=?", + (lock, int(time.time()) + 3600, 12345, tid), + ) + conn.execute( + "INSERT INTO task_runs (task_id, status, claim_lock, " + "claim_expires, worker_pid, started_at) " + "VALUES (?, 'running', ?, ?, ?, ?)", + (tid, lock, int(time.time()) + 3600, 12345, int(time.time())), + ) + rid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + conn.execute( + "UPDATE tasks SET current_run_id=? WHERE id=?", + (rid, tid), + ) + + ok = kb.reclaim_task(conn, tid, reason="operator fixed config") + assert ok + + task = kb.get_task(conn, tid) + assert task.consecutive_failures == 0 + assert task.last_failure_error is None + assert task.status == "ready" + finally: + conn.close() diff --git a/tests/hermes_cli/test_kanban_diagnostics.py b/tests/hermes_cli/test_kanban_diagnostics.py index 0fabd8558e..d39695ca94 100644 --- a/tests/hermes_cli/test_kanban_diagnostics.py +++ b/tests/hermes_cli/test_kanban_diagnostics.py @@ -39,8 +39,8 @@ def _task(**overrides): "title": "demo task", "assignee": "demo", "status": "ready", - "spawn_failures": 0, - "last_spawn_error": None, + "consecutive_failures": 0, + "last_failure_error": None, } base.update(overrides) return base @@ -126,27 +126,55 @@ def test_prose_phantom_refs_clears_on_later_clean_edit(): assert diags == [] -def test_repeated_spawn_failures_fires_at_threshold(): - task = _task(status="blocked", spawn_failures=3, - last_spawn_error="Profile 'debugger' does not exist") - diags = kd.compute_task_diagnostics(task, [], []) +def test_repeated_failures_fires_at_threshold_on_spawn(): + """A task with multiple spawn_failed runs gets a spawn-flavoured + diagnostic (title mentions 'spawn', suggested action is ``doctor``). + """ + task = _task(status="ready", consecutive_failures=3, + last_failure_error="Profile 'debugger' does not exist") + runs = [ + _run(outcome="spawn_failed", run_id=1), + _run(outcome="spawn_failed", run_id=2), + _run(outcome="spawn_failed", run_id=3), + ] + diags = kd.compute_task_diagnostics(task, [], runs) assert len(diags) == 1 d = diags[0] - assert d.kind == "repeated_spawn_failures" + assert d.kind == "repeated_failures" assert d.severity == "error" # CLI hints are what operators actually need here. suggested = [a.label for a in d.actions if a.suggested] assert any("doctor" in s for s in suggested) -def test_repeated_spawn_failures_escalates_to_critical(): - task = _task(spawn_failures=6, last_spawn_error="boom") +def test_repeated_failures_fires_on_timeout_loop(): + """The rule surfaces for timeout loops too — that's the point of + unifying the counter. Suggested action is 'check logs', not + 'fix profile'.""" + task = _task(status="ready", consecutive_failures=3, + last_failure_error="elapsed 600s > limit 300s") + runs = [ + _run(outcome="timed_out", run_id=1), + _run(outcome="timed_out", run_id=2), + _run(outcome="timed_out", run_id=3), + ] + diags = kd.compute_task_diagnostics(task, [], runs) + assert len(diags) == 1 + d = diags[0] + assert d.kind == "repeated_failures" + assert d.data["most_recent_outcome"] == "timed_out" + suggested = [a.label for a in d.actions if a.suggested] + assert any("log" in s.lower() for s in suggested) + + +def test_repeated_failures_escalates_to_critical(): + task = _task(consecutive_failures=6, last_failure_error="boom") diags = kd.compute_task_diagnostics(task, [], []) assert diags[0].severity == "critical" -def test_repeated_spawn_failures_below_threshold_silent(): - task = _task(spawn_failures=2) +def test_repeated_failures_below_threshold_silent(): + task = _task(consecutive_failures=2) assert kd.compute_task_diagnostics(task, [], []) == [] @@ -243,9 +271,9 @@ def test_repeated_crashes_no_error_fallback_title(): assert "no error recorded" in diags[0].title -def test_repeated_spawn_failures_surfaces_actual_error_in_title(): - task = _task(spawn_failures=5, - last_spawn_error="insufficient_quota: billing limit reached") +def test_repeated_failures_surfaces_actual_error_in_title(): + task = _task(consecutive_failures=5, + last_failure_error="insufficient_quota: billing limit reached") diags = kd.compute_task_diagnostics(task, [], []) assert len(diags) == 1 d = diags[0] @@ -280,8 +308,8 @@ def test_repeated_crashes_truncates_huge_tracebacks(): def test_diagnostics_sorted_critical_first(): """A task with both a critical (many spawn failures) and a warning (prose phantoms) diagnostic should list the critical one first.""" - task = _task(status="done", spawn_failures=10, - last_spawn_error="nope") + task = _task(status="done", consecutive_failures=10, + last_failure_error="nope") events = [ _event("completed", ts=100, summary="referenced t_missing"), _event("suspected_hallucinated_references", ts=101, @@ -289,7 +317,7 @@ def test_diagnostics_sorted_critical_first(): ] diags = kd.compute_task_diagnostics(task, events, []) kinds = [d.kind for d in diags] - assert kinds[0] == "repeated_spawn_failures" # critical + assert kinds[0] == "repeated_failures" # critical assert "prose_phantom_refs" in kinds @@ -346,8 +374,8 @@ def test_broken_rule_is_isolated(monkeypatch): # rules should still run and produce their diagnostics. monkeypatch.setattr(kd, "_RULES", [_bad_rule] + kd._RULES) - task = _task(spawn_failures=5, last_spawn_error="e") + task = _task(consecutive_failures=5, last_failure_error="e") diags = kd.compute_task_diagnostics(task, [], []) # The broken rule silently drops, the real one still fires. kinds = [d.kind for d in diags] - assert "repeated_spawn_failures" in kinds + assert "repeated_failures" in kinds diff --git a/tests/plugins/test_kanban_dashboard_plugin.py b/tests/plugins/test_kanban_dashboard_plugin.py index 0b6a3510f8..580b187ecc 100644 --- a/tests/plugins/test_kanban_dashboard_plugin.py +++ b/tests/plugins/test_kanban_dashboard_plugin.py @@ -1395,7 +1395,7 @@ def test_diagnostics_endpoint_severity_filter(client): # An error-severity diagnostic (spawn failures) on another p2 = kb.create_task(conn, title="spawn", assignee="b") conn.execute( - "UPDATE tasks SET spawn_failures=5, last_spawn_error='x' WHERE id=?", + "UPDATE tasks SET consecutive_failures=5, last_failure_error='x' WHERE id=?", (p2,), ) conn.commit()