fix(kanban): unify failure counter across spawn/timeout/crash outcomes (#20410)

The dispatcher's circuit breaker only protected against spawn-side
failures (profile missing, workspace mount error, exec failure).
Workers that successfully spawned but then timed out or crashed
re-queued to ``ready`` with no counter increment, so the next tick
re-spawned them — loops forever until someone noticed. Reported
externally on Twitter (Forbidden Seeds) and confirmed by walking the
kernel: ``enforce_max_runtime`` flipped the task back to ready, emitted
a ``timed_out`` event, and never touched ``spawn_failures``; same for
``detect_crashed_workers``.

Fix: unify the counter across all non-success outcomes.

Schema
------
* ``tasks.spawn_failures`` → ``tasks.consecutive_failures``
* ``tasks.last_spawn_error`` → ``tasks.last_failure_error``
* Migration renames the columns in-place on existing DBs (``ALTER
  TABLE RENAME COLUMN`` — SQLite >= 3.25) so historical counter
  values are preserved. Row mappers fall through to the legacy names
  if both column renames and a migration somehow got out of sync.

Counter lifecycle
-----------------
New helper ``_record_task_failure(conn, task_id, error, *, outcome,
release_claim, end_run, event_payload_extra)`` is the single point
every non-success outcome funnels through:

* ``spawn_failed``  → ``_record_spawn_failure`` (kept as alias)
  calls it with ``release_claim=True, end_run=True`` — transitions
  running→ready, clears claim, closes run.
* ``timed_out`` → ``enforce_max_runtime`` already does the status
  transition + run close + event emission, then calls
  ``_record_task_failure`` with ``release_claim=False, end_run=False``
  just to bump the counter (and trip the breaker if needed).
* ``crashed`` → ``detect_crashed_workers`` same pattern, but the
  counter increment runs after the main write_txn closes (SQLite
  doesn't nest write transactions).

If the counter hits the breaker threshold (``DEFAULT_FAILURE_LIMIT=5``,
same as before), the task transitions to ``blocked`` with a ``gave_up``
event on top of whatever outcome-specific event was already emitted.

Reset semantics changed: the counter now clears only on successful
``complete_task`` (and operator ``reclaim_task`` — an explicit "I've
looked at this, try again with a fresh budget"). Previously
``_clear_spawn_failures`` ran on every successful spawn, which would
have wiped the counter before a timeout could accumulate past threshold
— exactly the loop this fix prevents.

Diagnostics
-----------
* ``_rule_repeated_spawn_failures`` → ``_rule_repeated_failures``. Now
  fires regardless of which outcome is at fault. Classifies the most
  recent failure (spawn_failed / timed_out / crashed) from the run
  history so the title ("Agent timeout x3", "Agent crash x4", "Agent
  spawn x5") and suggested action (``doctor`` for spawn, ``log`` for
  timeout/crash) stay outcome-specific without N duplicate rules.
* ``_rule_repeated_crashes`` kept as a narrower early-warning at
  threshold 2 (vs 3 for the unified rule), but now suppresses itself
  when the unified rule would also fire — avoids double-flagging.
* Diagnostic ``data`` payload now carries
  ``{consecutive_failures, most_recent_outcome, last_error}`` instead
  of spawn-specific keys.

CLI
---
* ``Task.consecutive_failures`` / ``Task.last_failure_error`` are the
  public fields now. Existing callers that referenced the old names
  get migrated (tests updated in this commit).
* Backward-compat: ``DEFAULT_SPAWN_FAILURE_LIMIT``,
  ``_clear_spawn_failures``, ``_record_spawn_failure`` stay as aliases.

Tests
-----
* 6 new kernel tests: timeout increments counter, 3 consecutive
  timeouts trip the breaker (was the reported gap), crash increments
  counter, reclaim clears counter, completion clears counter, spawn
  success does NOT clear counter.
* Diagnostic tests: updated ``repeated_spawn_failures`` cases to use
  the new kind name and add a timeout-loop test.
* Dashboard API test: spawn_failures column update → consecutive_failures.

389/389 kanban-suite tests pass.

Live verification
-----------------
Seeded 4 tasks in an isolated HERMES_HOME: 3 timeouts, 4 crashes,
2-spawn-failed + 2-timed-out, and a task that had prior failures but
completed successfully. Board correctly shows "!! 3 tasks need
attention" (the successful one has no badge because the counter
reset). Drawer for the timeout-loop task renders "Agent timeout x3"
with most_recent_outcome=timed_out and the "Check logs" suggested
action (not the spawn-flavoured "Verify profile"). The successful
task has zero diagnostics.

Closes the Forbidden-Seeds-reported gap.
This commit is contained in:
Teknium 2026-05-05 13:55:37 -07:00 committed by GitHub
parent 587ef55f2c
commit 1fc8733a69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 630 additions and 125 deletions

View file

@ -573,9 +573,18 @@ class Task:
tenant: Optional[str]
result: Optional[str] = None
idempotency_key: Optional[str] = None
spawn_failures: int = 0
# Unified non-success counter. Incremented on any of:
# * spawn failure (dispatcher couldn't launch the worker)
# * timed_out outcome (worker exceeded max_runtime_seconds)
# * crashed outcome (worker PID vanished)
# Reset to 0 only on a successful completion. See
# ``_record_task_failure`` for the circuit-breaker trip rule.
# (Pre-rename column: ``spawn_failures``.)
consecutive_failures: int = 0
worker_pid: Optional[int] = None
last_spawn_error: Optional[str] = None
# Short excerpt of the last failure's error text (any outcome, not
# just spawn). Pre-rename column: ``last_spawn_error``.
last_failure_error: Optional[str] = None
max_runtime_seconds: Optional[int] = None
last_heartbeat_at: Optional[int] = None
current_run_id: Optional[int] = None
@ -617,9 +626,15 @@ class Task:
tenant=row["tenant"] if "tenant" in keys else None,
result=row["result"] if "result" in keys else None,
idempotency_key=row["idempotency_key"] if "idempotency_key" in keys else None,
spawn_failures=row["spawn_failures"] if "spawn_failures" in keys else 0,
consecutive_failures=(
row["consecutive_failures"] if "consecutive_failures" in keys
else (row["spawn_failures"] if "spawn_failures" in keys else 0)
),
worker_pid=row["worker_pid"] if "worker_pid" in keys else None,
last_spawn_error=row["last_spawn_error"] if "last_spawn_error" in keys else None,
last_failure_error=(
row["last_failure_error"] if "last_failure_error" in keys
else (row["last_spawn_error"] if "last_spawn_error" in keys else None)
),
max_runtime_seconds=(
row["max_runtime_seconds"] if "max_runtime_seconds" in keys else None
),
@ -735,9 +750,14 @@ CREATE TABLE IF NOT EXISTS tasks (
tenant TEXT,
result TEXT,
idempotency_key TEXT,
spawn_failures INTEGER NOT NULL DEFAULT 0,
-- Unified consecutive-failure counter. Incremented on spawn
-- failure, timeout, or crash; reset only on successful completion.
-- The circuit breaker in _record_task_failure trips when this
-- exceeds DEFAULT_FAILURE_LIMIT consecutive non-successes.
consecutive_failures INTEGER NOT NULL DEFAULT 0,
worker_pid INTEGER,
last_spawn_error TEXT,
-- Short excerpt of the most recent failure's error text.
last_failure_error TEXT,
max_runtime_seconds INTEGER,
last_heartbeat_at INTEGER,
-- Pointer into task_runs for the currently-active run (NULL if no
@ -933,14 +953,31 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"CREATE INDEX IF NOT EXISTS idx_tasks_idempotency "
"ON tasks(idempotency_key)"
)
if "spawn_failures" not in cols:
conn.execute(
"ALTER TABLE tasks ADD COLUMN spawn_failures INTEGER NOT NULL DEFAULT 0"
)
# Legacy column rename: ``spawn_failures`` → ``consecutive_failures``
# and ``last_spawn_error`` → ``last_failure_error``. The counter was
# originally spawn-only; it's now unified across spawn/timeout/
# crash outcomes. Rename when only the legacy columns exist to
# preserve historical counter values across upgrades. Add fresh
# otherwise.
if "consecutive_failures" not in cols:
if "spawn_failures" in cols:
conn.execute(
"ALTER TABLE tasks RENAME COLUMN spawn_failures TO consecutive_failures"
)
else:
conn.execute(
"ALTER TABLE tasks ADD COLUMN consecutive_failures "
"INTEGER NOT NULL DEFAULT 0"
)
if "worker_pid" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER")
if "last_spawn_error" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN last_spawn_error TEXT")
if "last_failure_error" not in cols:
if "last_spawn_error" in cols:
conn.execute(
"ALTER TABLE tasks RENAME COLUMN last_spawn_error TO last_failure_error"
)
else:
conn.execute("ALTER TABLE tasks ADD COLUMN last_failure_error TEXT")
if "max_runtime_seconds" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER")
if "last_heartbeat_at" not in cols:
@ -1895,6 +1932,11 @@ def reclaim_task(
},
run_id=run_id,
)
# Operator intervention — they've looked at the task, so the
# consecutive-failures counter is now stale. Give the next retry
# a fresh budget. (_clear_failure_counter opens its own write_txn,
# so it runs after the enclosing one commits.)
_clear_failure_counter(conn, task_id)
return True
@ -2186,6 +2228,11 @@ def complete_task(
},
run_id=run_id,
)
# Successful completion — wipe the consecutive-failures counter.
# Failure history stays on the event log for audit; the counter
# just tracks "is there a current pathology the breaker should
# care about", and a success resets that question.
_clear_failure_counter(conn, task_id)
# Recompute ready status for dependents (separate txn so children see done).
recompute_ready(conn)
return True
@ -2444,7 +2491,9 @@ def set_workspace_path(
# stops retrying and parks the task in ``blocked`` with a reason so a human
# can investigate. Prevents the dispatcher from thrashing forever on a task
# whose profile doesn't exist, whose workspace is unmountable, etc.
DEFAULT_SPAWN_FAILURE_LIMIT = 5
DEFAULT_FAILURE_LIMIT = 5
# Legacy alias — callers / tests still reference the old name.
DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT
# Max bytes to keep in a single worker log file. The dispatcher truncates
# and rotates on spawn if the file is larger than this at spawn time.
@ -2668,6 +2717,20 @@ def enforce_max_runtime(
conn, tid, "timed_out", payload, run_id=run_id,
)
timed_out.append(tid)
# Increment the unified failure counter. Outside the write_txn
# above because ``_record_task_failure`` opens its own. If the
# breaker trips, this flips the task ``ready → blocked`` and
# emits a ``gave_up`` event on top of the ``timed_out`` we
# already emitted.
if cur.rowcount == 1:
_record_task_failure(
conn, tid,
error=f"elapsed {int(elapsed)}s > limit {int(row['max_runtime_seconds'])}s",
outcome="timed_out",
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "sigkill": killed},
)
return timed_out
@ -2699,6 +2762,10 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
dispatcher (the whole design is single-host).
"""
crashed: list[str] = []
# Per-crash details collected inside the main txn, used after it
# closes to run ``_record_task_failure`` (which needs its own
# write_txn so can't nest).
crash_details: list[tuple[str, int, str]] = [] # (task_id, pid, claimer)
with write_txn(conn):
rows = conn.execute(
"SELECT id, worker_pid, claim_lock FROM tasks "
@ -2734,67 +2801,169 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
run_id=run_id,
)
crashed.append(row["id"])
crash_details.append(
(row["id"], int(row["worker_pid"]), row["claim_lock"])
)
# Outside the main txn: increment the unified failure counter for
# each crashed task. If the breaker trips, the task transitions
# ready → blocked with a ``gave_up`` event on top of the ``crashed``
# event we already emitted.
for tid, pid, claimer in crash_details:
_record_task_failure(
conn, tid,
error=f"pid {pid} not alive",
outcome="crashed",
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "claimer": claimer},
)
return crashed
def _record_task_failure(
conn: sqlite3.Connection,
task_id: str,
error: str,
*,
outcome: str,
failure_limit: int = None,
release_claim: bool = False,
end_run: bool = False,
event_payload_extra: Optional[dict] = None,
) -> bool:
"""Record a non-success outcome (spawn_failed / crashed / timed_out)
and maybe trip the circuit breaker.
Unified replacement for the old spawn-only ``_record_spawn_failure``.
Every path that ends a task with a non-success outcome funnels
through here so the ``consecutive_failures`` counter and the
auto-block threshold stay consistent.
Returns True when the task was auto-blocked (counter reached
``failure_limit``), False when it was just updated in place.
Modes:
* ``release_claim=True, end_run=True`` spawn-failure path.
Caller has a running task with an open run; this transitions
it back to ``ready`` (or ``blocked`` when the breaker trips),
releases the claim, and closes the run with ``outcome=<outcome>``.
* ``release_claim=False, end_run=False`` timeout/crash path.
Caller has ALREADY flipped the task to ``ready`` and closed the
run with the appropriate outcome. This just increments the
counter; if the breaker trips, the task is re-transitioned
``ready blocked`` and a ``gave_up`` event is emitted.
``event_payload_extra`` merges into the ``gave_up`` event payload
when the breaker trips, so callers can include outcome-specific
context (e.g. pid on crash, elapsed on timeout).
"""
if failure_limit is None:
failure_limit = DEFAULT_FAILURE_LIMIT
blocked = False
with write_txn(conn):
row = conn.execute(
"SELECT consecutive_failures, status FROM tasks WHERE id = ?", (task_id,),
).fetchone()
if row is None:
return False
failures = int(row["consecutive_failures"]) + 1
cur_status = row["status"]
if failures >= failure_limit:
# Trip the breaker.
if release_claim:
# Spawn path: still running, also clear claim state.
conn.execute(
"UPDATE tasks SET status = 'blocked', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL, "
"consecutive_failures = ?, last_failure_error = ? "
"WHERE id = ? AND status IN ('running', 'ready')",
(failures, error[:500], task_id),
)
else:
# Timeout/crash path: task is already at ``ready``
# with claim cleared; just flip to blocked + update
# counter fields.
conn.execute(
"UPDATE tasks SET status = 'blocked', "
"consecutive_failures = ?, last_failure_error = ? "
"WHERE id = ? AND status IN ('ready', 'running')",
(failures, error[:500], task_id),
)
run_id = None
if end_run:
# Only the spawn path has an open run to close.
run_id = _end_run(
conn, task_id,
outcome="gave_up", status="gave_up",
error=error[:500],
metadata={"failures": failures, "trigger_outcome": outcome},
)
payload = {
"failures": failures,
"error": error[:500],
"trigger_outcome": outcome,
}
if event_payload_extra:
payload.update(event_payload_extra)
_append_event(
conn, task_id, "gave_up", payload, run_id=run_id,
)
blocked = True
else:
# Below threshold.
if release_claim:
# Spawn path: transition running → ready + clear claim.
conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL, "
"consecutive_failures = ?, last_failure_error = ? "
"WHERE id = ? AND status = 'running'",
(failures, error[:500], task_id),
)
else:
# Timeout/crash path: task is already at ``ready`` via
# its own UPDATE. Just bookkeep the counter + last error.
conn.execute(
"UPDATE tasks SET consecutive_failures = ?, "
"last_failure_error = ? WHERE id = ?",
(failures, error[:500], task_id),
)
if end_run:
# Spawn path: close the open run with outcome.
run_id = _end_run(
conn, task_id,
outcome=outcome, status=outcome,
error=error[:500],
metadata={"failures": failures},
)
_append_event(
conn, task_id, outcome,
{"error": error[:500], "failures": failures},
run_id=run_id,
)
# Timeout/crash path's caller already emitted its own event.
return blocked
# Backward-compat alias. Old name is referenced from tests and possibly
# third-party callers. New code should call ``_record_task_failure``.
def _record_spawn_failure(
conn: sqlite3.Connection,
task_id: str,
error: str,
*,
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
failure_limit: int = None,
) -> bool:
"""Release the claim, increment the failure counter, maybe auto-block.
Returns True when the task was auto-blocked (N failures exceeded),
False when it was just released back to ``ready`` for another try.
"""
blocked = False
with write_txn(conn):
row = conn.execute(
"SELECT spawn_failures FROM tasks WHERE id = ?", (task_id,),
).fetchone()
failures = int(row["spawn_failures"]) + 1 if row else 1
if failures >= failure_limit:
conn.execute(
"UPDATE tasks SET status = 'blocked', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL, "
"spawn_failures = ?, last_spawn_error = ? "
"WHERE id = ? AND status IN ('running', 'ready')",
(failures, error[:500], task_id),
)
run_id = _end_run(
conn, task_id,
outcome="gave_up", status="gave_up",
error=error[:500],
metadata={"failures": failures},
)
_append_event(
conn, task_id, "gave_up",
{"failures": failures, "error": error[:500]},
run_id=run_id,
)
blocked = True
else:
conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL, "
"spawn_failures = ?, last_spawn_error = ? "
"WHERE id = ? AND status = 'running'",
(failures, error[:500], task_id),
)
run_id = _end_run(
conn, task_id,
outcome="spawn_failed", status="spawn_failed",
error=error[:500],
metadata={"failures": failures},
)
_append_event(
conn, task_id, "spawn_failed",
{"error": error[:500], "failures": failures},
run_id=run_id,
)
return blocked
return _record_task_failure(
conn, task_id, error,
outcome="spawn_failed",
failure_limit=failure_limit,
release_claim=True,
end_run=True,
)
def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None:
@ -2818,16 +2987,28 @@ def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None:
_append_event(conn, task_id, "spawned", {"pid": int(pid)}, run_id=run_id)
def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None:
"""Reset the failure counter after a successful spawn."""
def _clear_failure_counter(conn: sqlite3.Connection, task_id: str) -> None:
"""Reset the unified consecutive-failures counter.
Called from ``complete_task`` on successful completion a fresh
success means the task + profile combination is working and any
past failures are history. NOT called on spawn success anymore:
a successful spawn proves the worker could start but says nothing
about whether the run will succeed, so we need to let timeouts and
crashes accumulate across spawn boundaries.
"""
with write_txn(conn):
conn.execute(
"UPDATE tasks SET spawn_failures = 0, last_spawn_error = NULL "
"WHERE id = ?",
"UPDATE tasks SET consecutive_failures = 0, "
"last_failure_error = NULL WHERE id = ?",
(task_id,),
)
# Legacy alias for test-code and anything else that still imports it.
_clear_spawn_failures = _clear_failure_counter
def has_spawnable_ready(conn: sqlite3.Connection) -> bool:
"""Return True iff there is at least one ready+assigned+unclaimed task
whose assignee maps to a real Hermes profile.
@ -2964,7 +3145,13 @@ def dispatch_once(
pid = _spawn(claimed, str(workspace))
if pid:
_set_worker_pid(conn, claimed.id, int(pid))
_clear_spawn_failures(conn, claimed.id)
# NOTE: we intentionally do NOT reset consecutive_failures
# here. A successful spawn proves the worker can start but
# doesn't prove the run will succeed. Under unified
# failure counting, resetting on spawn would let a task
# that keeps timing out after spawn loop forever. The
# counter is cleared only on successful completion (see
# complete_task).
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
spawned += 1
except Exception as exc:

View file

@ -312,21 +312,57 @@ def _rule_prose_phantom_refs(task, events, runs, now, cfg) -> list[Diagnostic]:
)]
def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Task's ``spawn_failures`` counter is climbing — worker can't
even start. Usually a profile misconfiguration (missing config.yaml,
bad PATH/venv, wrong credentials).
def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Task's unified ``consecutive_failures`` counter is climbing —
something about this task+profile combo is broken and each retry
fails the same way. Triggers regardless of the specific failure
mode (spawn error, timeout, crash) because operationally they
all look the same: the kernel keeps retrying and the operator
needs to intervene.
Threshold: cfg["spawn_failure_threshold"] (default 3).
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3
is one below the circuit-breaker's default (5), so the diagnostic
surfaces BEFORE the breaker trips giving operators a window to
fix the problem while the dispatcher's still retrying.
Accepts the legacy ``spawn_failure_threshold`` config key for
back-compat.
"""
threshold = int(cfg.get("spawn_failure_threshold", 3))
failures = _task_field(task, "spawn_failures", 0)
threshold = int(cfg.get(
"failure_threshold",
cfg.get("spawn_failure_threshold", 3),
))
# Read the new unified counter name, with a fallback to the legacy
# column name so this rule keeps working against old DB rows the
# caller somehow materialised without running the migration.
failures = (
_task_field(task, "consecutive_failures", None)
if _task_field(task, "consecutive_failures", None) is not None
else _task_field(task, "spawn_failures", 0)
)
if failures is None or failures < threshold:
return []
last_err = _task_field(task, "last_spawn_error")
last_err = (
_task_field(task, "last_failure_error", None)
if _task_field(task, "last_failure_error", None) is not None
else _task_field(task, "last_spawn_error", None)
)
assignee = _task_field(task, "assignee")
# Classify the most recent failure by peeking at run outcomes so
# the title + suggested action can be specific without a separate
# per-outcome rule.
ordered_runs = sorted(runs, key=lambda r: _task_field(r, "id", 0))
most_recent_outcome = None
for r in reversed(ordered_runs):
oc = _task_field(r, "outcome")
if oc in ("spawn_failed", "timed_out", "crashed"):
most_recent_outcome = oc
break
actions: list[DiagnosticAction] = []
if assignee and assignee != "default":
if most_recent_outcome == "spawn_failed" and assignee and assignee != "default":
# Spawn is failing specifically — profile setup issue.
actions.append(DiagnosticAction(
kind="cli_hint",
label=f"Verify profile: hermes -p {assignee} doctor",
@ -338,28 +374,49 @@ def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnost
label=f"Fix profile auth: hermes -p {assignee} auth",
payload={"command": f"hermes -p {assignee} auth"},
))
actions.extend(_generic_recovery_actions(task, running=False))
elif most_recent_outcome in ("timed_out", "crashed"):
# Worker got off the ground but died. Logs are the right place
# to diagnose; reclaim/reassign are the recovery levers.
task_id = _task_field(task, "id")
if task_id:
actions.append(DiagnosticAction(
kind="cli_hint",
label=f"Check logs: hermes kanban log {task_id}",
payload={"command": f"hermes kanban log {task_id}"},
suggested=True,
))
actions.extend(_generic_recovery_actions(
task, running=_task_field(task, "status") == "running",
))
severity = "critical" if failures >= threshold * 2 else "error"
err_text = (last_err or "").strip() if last_err else ""
err_snippet = err_text[:500] + ("" if len(err_text) > 500 else "") if err_text else ""
outcome_label = {
"spawn_failed": "spawn",
"timed_out": "timeout",
"crashed": "crash",
}.get(most_recent_outcome or "", "failure")
if err_snippet:
title = f"Agent spawn failed {failures}x: {err_snippet.splitlines()[0][:160]}"
title = f"Agent {outcome_label} x{failures}: {err_snippet.splitlines()[0][:160]}"
detail = (
f"The dispatcher tried to launch a worker {failures} times "
f"and failed every time. Full last error:\n\n{err_snippet}\n\n"
f"Common causes: missing config.yaml, bad venv/PATH, or "
f"missing credentials for the profile's configured provider."
f"This task has failed {failures} times in a row "
f"(most recent: {outcome_label}). Full last error:\n\n"
f"{err_snippet}\n\n"
f"The dispatcher will keep retrying until the consecutive-"
f"failures counter trips the circuit breaker (default 5), "
f"at which point the task auto-blocks. Fix the root cause "
f"and reclaim to retry."
)
else:
title = f"Agent spawn failed {failures}x (no error recorded)"
title = f"Agent {outcome_label} x{failures} (no error recorded)"
detail = (
f"The dispatcher tried to launch a worker {failures} times "
f"and failed every time, but no error text was captured. "
f"Usually a profile configuration issue — check profile "
f"health with the suggested command."
f"This task has failed {failures} times in a row "
f"(most recent: {outcome_label}) but no error text was "
f"captured. Check the suggested command or the worker log."
)
return [Diagnostic(
kind="repeated_spawn_failures",
kind="repeated_failures",
severity=severity,
title=title,
detail=detail,
@ -367,7 +424,11 @@ def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnost
first_seen_at=now,
last_seen_at=now,
count=failures,
data={"spawn_failures": failures, "last_spawn_error": last_err},
data={
"consecutive_failures": failures,
"most_recent_outcome": most_recent_outcome,
"last_error": last_err,
},
)]
@ -378,7 +439,23 @@ def _rule_repeated_crashes(task, events, runs, now, cfg) -> list[Diagnostic]:
broken (OOM, missing dependency, tool it needs is down).
Threshold: cfg["crash_threshold"] (default 2).
Narrower than ``repeated_failures`` fires earlier (2 crashes vs 3
total failures) so the operator gets a crash-specific heads-up
before the unified rule kicks in. Suppresses itself when the
unified rule is also about to fire, to avoid double-flagging.
"""
failure_threshold = int(cfg.get(
"failure_threshold",
cfg.get("spawn_failure_threshold", 3),
))
unified_counter = (
_task_field(task, "consecutive_failures", 0) or 0
)
# Unified rule will catch this — let it handle to avoid double fire.
if unified_counter >= failure_threshold:
return []
threshold = int(cfg.get("crash_threshold", 2))
ordered = sorted(runs, key=lambda r: _task_field(r, "id", 0))
# Count trailing consecutive 'crashed' outcomes.
@ -498,7 +575,7 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
_RULES: list[RuleFn] = [
_rule_hallucinated_cards,
_rule_prose_phantom_refs,
_rule_repeated_spawn_failures,
_rule_repeated_failures,
_rule_repeated_crashes,
_rule_stuck_in_blocked,
]
@ -509,13 +586,15 @@ _RULES: list[RuleFn] = [
DIAGNOSTIC_KINDS = (
"hallucinated_cards",
"prose_phantom_refs",
"repeated_spawn_failures",
"repeated_failures",
"repeated_crashes",
"stuck_in_blocked",
)
DEFAULT_CONFIG = {
"failure_threshold": 3,
# Legacy alias accepted at read time by _rule_repeated_failures.
"spawn_failure_threshold": 3,
"crash_threshold": 2,
"blocked_stale_hours": 24,

View file

@ -96,7 +96,7 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna
assert tid not in res.auto_blocked
task = kb.get_task(conn, tid)
assert task.status == "ready"
assert task.spawn_failures == 3
assert task.consecutive_failures == 3
# Two more ticks → fifth failure exceeds the limit.
res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5)
@ -105,15 +105,20 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna
assert tid in res2.auto_blocked
task = kb.get_task(conn, tid)
assert task.status == "blocked"
assert task.spawn_failures >= 5
assert task.last_spawn_error and "no PATH" in task.last_spawn_error
assert task.consecutive_failures >= 5
assert task.last_failure_error and "no PATH" in task.last_failure_error
finally:
conn.close()
def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spawnable):
"""A successful spawn clears the counter so past failures don't count
against future retries of the same task."""
def test_successful_spawn_does_not_reset_failure_counter(kanban_home, all_assignees_spawnable):
"""Under unified consecutive-failure counting, a successful spawn
does NOT reset the counter past failures stay on the books until
a successful completion. This is by design: it prevents a task
that keeps timing out after spawn from looping forever.
(Pre-unification behaviour was to reset on spawn success; see the
complete_task reset for the replacement point.)
"""
calls = [0]
def _flaky_spawn(task, ws):
calls[0] += 1
@ -128,11 +133,12 @@ def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spaw
kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
task = kb.get_task(conn, tid)
assert task.spawn_failures == 2
assert task.consecutive_failures == 2
kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5)
task = kb.get_task(conn, tid)
assert task.spawn_failures == 0
assert task.last_spawn_error is None
# Counter STAYS at 2 — spawn succeeded but run isn't complete yet.
assert task.consecutive_failures == 2
assert task.last_failure_error is not None
# Task is now running with a pid.
assert task.status == "running"
assert task.worker_pid == 99999
@ -140,6 +146,30 @@ def test_successful_spawn_resets_failure_counter(kanban_home, all_assignees_spaw
conn.close()
def test_successful_completion_resets_failure_counter(kanban_home, all_assignees_spawnable):
"""A successful kb.complete_task wipes the counter — the task+profile
combination proved it can succeed, so past failures are history."""
conn = kb.connect()
try:
tid = kb.create_task(conn, title="x", assignee="worker")
# Simulate 2 prior failures on the record.
kb.write_txn_ctx = kb.write_txn
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET consecutive_failures = 2, "
"last_failure_error = 'old failure' WHERE id = ?",
(tid,),
)
# Complete the task.
ok = kb.complete_task(conn, tid, summary="done")
assert ok
task = kb.get_task(conn, tid)
assert task.consecutive_failures == 0
assert task.last_failure_error is None
finally:
conn.close()
def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable):
"""`dir:` workspace with no path should fail workspace resolution AND
count against the failure budget not just crash the tick."""
@ -158,9 +188,9 @@ def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spa
)
res = kb.dispatch_once(conn, failure_limit=3)
task = kb.get_task(conn, tid)
assert task.spawn_failures == 1
assert task.consecutive_failures == 1
assert task.status == "ready"
assert task.last_spawn_error and "workspace" in task.last_spawn_error
assert task.last_failure_error and "workspace" in task.last_failure_error
# Run twice more → auto-blocked.
kb.dispatch_once(conn, failure_limit=3)
res = kb.dispatch_once(conn, failure_limit=3)
@ -3052,3 +3082,184 @@ def test_reassign_task_with_reclaim_first_switches_profile(kanban_home):
assert row["status"] == "ready"
finally:
conn.close()
# ---------------------------------------------------------------------------
# Unified failure counter — timeout + crash paths increment the same counter
# as spawn failures, and the circuit breaker trips after N consecutive
# failures regardless of which outcome caused them.
# ---------------------------------------------------------------------------
def test_enforce_max_runtime_increments_consecutive_failures(kanban_home, monkeypatch):
"""A single timeout increments consecutive_failures by 1 (was the
infinite-respawn gap before unification)."""
import hermes_cli.kanban_db as _kb
state = {"sent_term": False}
def _alive(pid):
return not state["sent_term"]
def _signal(pid, sig):
import signal as _sig
if sig == _sig.SIGTERM:
state["sent_term"] = True
monkeypatch.setattr(_kb, "_pid_alive", _alive)
conn = kb.connect()
try:
tid = kb.create_task(
conn, title="overrun", assignee="worker",
max_runtime_seconds=1,
)
kb.claim_task(conn, tid)
kb._set_worker_pid(conn, tid, os.getpid())
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?",
(int(time.time()) - 30, tid),
)
before = kb.get_task(conn, tid)
assert before.consecutive_failures == 0
kb.enforce_max_runtime(conn, signal_fn=_signal)
after = kb.get_task(conn, tid)
assert after.consecutive_failures == 1
assert "elapsed" in (after.last_failure_error or "")
# Task status flipped back to ready (not yet past threshold).
assert after.status == "ready"
finally:
conn.close()
def test_repeated_timeouts_trip_the_circuit_breaker(kanban_home, monkeypatch):
"""N consecutive timeouts with the unified counter should eventually
hit the failure_limit threshold and auto-block the task. This closes
the Forbidden-Seeds-reported gap where timeout loops never capped.
"""
import hermes_cli.kanban_db as _kb
state = {"sent_term": False}
def _alive(pid):
return not state["sent_term"]
def _signal(pid, sig):
import signal as _sig
if sig == _sig.SIGTERM:
state["sent_term"] = True
monkeypatch.setattr(_kb, "_pid_alive", _alive)
conn = kb.connect()
try:
tid = kb.create_task(
conn, title="loop forever", assignee="slow-worker",
max_runtime_seconds=1,
)
# Drop the failure_limit to 3 so we don't need 5 timeouts.
# This uses the module-level DEFAULT; we simulate by calling
# _record_task_failure directly with a tight limit.
for _ in range(3):
# Fresh claim + "started long ago" each iteration.
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, "
"claim_expires=?, worker_pid=?, started_at=? "
"WHERE id=?",
(
f"{_kb._claimer_id().split(':', 1)[0]}:lock",
int(time.time()) + 3600,
os.getpid(),
int(time.time()) - 30,
tid,
),
)
conn.execute(
"INSERT INTO task_runs (task_id, status, claim_lock, "
"claim_expires, worker_pid, started_at) "
"VALUES (?, 'running', ?, ?, ?, ?)",
(
tid,
f"{_kb._claimer_id().split(':', 1)[0]}:lock",
int(time.time()) + 3600,
os.getpid(),
int(time.time()) - 30,
),
)
rid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute(
"UPDATE tasks SET current_run_id=? WHERE id=?",
(rid, tid),
)
state["sent_term"] = False
# Lower the threshold by monkeypatching the default.
monkeypatch.setattr(_kb, "DEFAULT_FAILURE_LIMIT", 3)
kb.enforce_max_runtime(conn, signal_fn=_signal)
final = kb.get_task(conn, tid)
# After 3 consecutive timeouts with failure_limit=3, task should
# be auto-blocked, not looping forever as ``ready``.
assert final.status == "blocked", \
f"expected blocked after 3 timeouts, got {final.status}"
assert final.consecutive_failures >= 3
# ``gave_up`` event emitted (plus 3 ``timed_out`` events).
kinds = [
r["kind"] for r in conn.execute(
"SELECT kind FROM task_events WHERE task_id=? ORDER BY id",
(tid,),
)
]
assert kinds.count("timed_out") >= 3
assert "gave_up" in kinds
finally:
conn.close()
def test_detect_crashed_workers_increments_counter(kanban_home):
"""A single crash increments the consecutive_failures counter."""
conn = kb.connect()
try:
tid = kb.create_task(conn, title="crashy", assignee="worker")
kb.claim_task(conn, tid)
kb._set_worker_pid(conn, tid, 99999) # fake pid — not alive
kb.detect_crashed_workers(conn)
task = kb.get_task(conn, tid)
assert task.consecutive_failures == 1
assert task.status == "ready"
finally:
conn.close()
def test_reclaim_task_clears_failure_counter(kanban_home):
"""Operator reclaim wipes the counter so the next retry gets a fresh
budget."""
import secrets
conn = kb.connect()
try:
tid = kb.create_task(conn, title="stuck", assignee="worker")
lock = secrets.token_hex(4)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, "
"claim_expires=?, worker_pid=?, consecutive_failures=4, "
"last_failure_error='prior issue' WHERE id=?",
(lock, int(time.time()) + 3600, 12345, tid),
)
conn.execute(
"INSERT INTO task_runs (task_id, status, claim_lock, "
"claim_expires, worker_pid, started_at) "
"VALUES (?, 'running', ?, ?, ?, ?)",
(tid, lock, int(time.time()) + 3600, 12345, int(time.time())),
)
rid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute(
"UPDATE tasks SET current_run_id=? WHERE id=?",
(rid, tid),
)
ok = kb.reclaim_task(conn, tid, reason="operator fixed config")
assert ok
task = kb.get_task(conn, tid)
assert task.consecutive_failures == 0
assert task.last_failure_error is None
assert task.status == "ready"
finally:
conn.close()

View file

@ -39,8 +39,8 @@ def _task(**overrides):
"title": "demo task",
"assignee": "demo",
"status": "ready",
"spawn_failures": 0,
"last_spawn_error": None,
"consecutive_failures": 0,
"last_failure_error": None,
}
base.update(overrides)
return base
@ -126,27 +126,55 @@ def test_prose_phantom_refs_clears_on_later_clean_edit():
assert diags == []
def test_repeated_spawn_failures_fires_at_threshold():
task = _task(status="blocked", spawn_failures=3,
last_spawn_error="Profile 'debugger' does not exist")
diags = kd.compute_task_diagnostics(task, [], [])
def test_repeated_failures_fires_at_threshold_on_spawn():
"""A task with multiple spawn_failed runs gets a spawn-flavoured
diagnostic (title mentions 'spawn', suggested action is ``doctor``).
"""
task = _task(status="ready", consecutive_failures=3,
last_failure_error="Profile 'debugger' does not exist")
runs = [
_run(outcome="spawn_failed", run_id=1),
_run(outcome="spawn_failed", run_id=2),
_run(outcome="spawn_failed", run_id=3),
]
diags = kd.compute_task_diagnostics(task, [], runs)
assert len(diags) == 1
d = diags[0]
assert d.kind == "repeated_spawn_failures"
assert d.kind == "repeated_failures"
assert d.severity == "error"
# CLI hints are what operators actually need here.
suggested = [a.label for a in d.actions if a.suggested]
assert any("doctor" in s for s in suggested)
def test_repeated_spawn_failures_escalates_to_critical():
task = _task(spawn_failures=6, last_spawn_error="boom")
def test_repeated_failures_fires_on_timeout_loop():
"""The rule surfaces for timeout loops too — that's the point of
unifying the counter. Suggested action is 'check logs', not
'fix profile'."""
task = _task(status="ready", consecutive_failures=3,
last_failure_error="elapsed 600s > limit 300s")
runs = [
_run(outcome="timed_out", run_id=1),
_run(outcome="timed_out", run_id=2),
_run(outcome="timed_out", run_id=3),
]
diags = kd.compute_task_diagnostics(task, [], runs)
assert len(diags) == 1
d = diags[0]
assert d.kind == "repeated_failures"
assert d.data["most_recent_outcome"] == "timed_out"
suggested = [a.label for a in d.actions if a.suggested]
assert any("log" in s.lower() for s in suggested)
def test_repeated_failures_escalates_to_critical():
task = _task(consecutive_failures=6, last_failure_error="boom")
diags = kd.compute_task_diagnostics(task, [], [])
assert diags[0].severity == "critical"
def test_repeated_spawn_failures_below_threshold_silent():
task = _task(spawn_failures=2)
def test_repeated_failures_below_threshold_silent():
task = _task(consecutive_failures=2)
assert kd.compute_task_diagnostics(task, [], []) == []
@ -243,9 +271,9 @@ def test_repeated_crashes_no_error_fallback_title():
assert "no error recorded" in diags[0].title
def test_repeated_spawn_failures_surfaces_actual_error_in_title():
task = _task(spawn_failures=5,
last_spawn_error="insufficient_quota: billing limit reached")
def test_repeated_failures_surfaces_actual_error_in_title():
task = _task(consecutive_failures=5,
last_failure_error="insufficient_quota: billing limit reached")
diags = kd.compute_task_diagnostics(task, [], [])
assert len(diags) == 1
d = diags[0]
@ -280,8 +308,8 @@ def test_repeated_crashes_truncates_huge_tracebacks():
def test_diagnostics_sorted_critical_first():
"""A task with both a critical (many spawn failures) and a warning
(prose phantoms) diagnostic should list the critical one first."""
task = _task(status="done", spawn_failures=10,
last_spawn_error="nope")
task = _task(status="done", consecutive_failures=10,
last_failure_error="nope")
events = [
_event("completed", ts=100, summary="referenced t_missing"),
_event("suspected_hallucinated_references", ts=101,
@ -289,7 +317,7 @@ def test_diagnostics_sorted_critical_first():
]
diags = kd.compute_task_diagnostics(task, events, [])
kinds = [d.kind for d in diags]
assert kinds[0] == "repeated_spawn_failures" # critical
assert kinds[0] == "repeated_failures" # critical
assert "prose_phantom_refs" in kinds
@ -346,8 +374,8 @@ def test_broken_rule_is_isolated(monkeypatch):
# rules should still run and produce their diagnostics.
monkeypatch.setattr(kd, "_RULES", [_bad_rule] + kd._RULES)
task = _task(spawn_failures=5, last_spawn_error="e")
task = _task(consecutive_failures=5, last_failure_error="e")
diags = kd.compute_task_diagnostics(task, [], [])
# The broken rule silently drops, the real one still fires.
kinds = [d.kind for d in diags]
assert "repeated_spawn_failures" in kinds
assert "repeated_failures" in kinds

View file

@ -1395,7 +1395,7 @@ def test_diagnostics_endpoint_severity_filter(client):
# An error-severity diagnostic (spawn failures) on another
p2 = kb.create_task(conn, title="spawn", assignee="b")
conn.execute(
"UPDATE tasks SET spawn_failures=5, last_spawn_error='x' WHERE id=?",
"UPDATE tasks SET consecutive_failures=5, last_failure_error='x' WHERE id=?",
(p2,),
)
conn.commit()