mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
The dispatcher's circuit breaker only protected against spawn-side
failures (profile missing, workspace mount error, exec failure).
Workers that successfully spawned but then timed out or crashed
re-queued to ``ready`` with no counter increment, so the next tick
re-spawned them — loops forever until someone noticed. Reported
externally on Twitter (Forbidden Seeds) and confirmed by walking the
kernel: ``enforce_max_runtime`` flipped the task back to ready, emitted
a ``timed_out`` event, and never touched ``spawn_failures``; same for
``detect_crashed_workers``.
Fix: unify the counter across all non-success outcomes.
Schema
------
* ``tasks.spawn_failures`` → ``tasks.consecutive_failures``
* ``tasks.last_spawn_error`` → ``tasks.last_failure_error``
* Migration renames the columns in-place on existing DBs (``ALTER
TABLE RENAME COLUMN`` — SQLite >= 3.25) so historical counter
values are preserved. Row mappers fall through to the legacy names
if both column renames and a migration somehow got out of sync.
Counter lifecycle
-----------------
New helper ``_record_task_failure(conn, task_id, error, *, outcome,
release_claim, end_run, event_payload_extra)`` is the single point
every non-success outcome funnels through:
* ``spawn_failed`` → ``_record_spawn_failure`` (kept as alias)
calls it with ``release_claim=True, end_run=True`` — transitions
running→ready, clears claim, closes run.
* ``timed_out`` → ``enforce_max_runtime`` already does the status
transition + run close + event emission, then calls
``_record_task_failure`` with ``release_claim=False, end_run=False``
just to bump the counter (and trip the breaker if needed).
* ``crashed`` → ``detect_crashed_workers`` same pattern, but the
counter increment runs after the main write_txn closes (SQLite
doesn't nest write transactions).
If the counter hits the breaker threshold (``DEFAULT_FAILURE_LIMIT=5``,
same as before), the task transitions to ``blocked`` with a ``gave_up``
event on top of whatever outcome-specific event was already emitted.
Reset semantics changed: the counter now clears only on successful
``complete_task`` (and operator ``reclaim_task`` — an explicit "I've
looked at this, try again with a fresh budget"). Previously
``_clear_spawn_failures`` ran on every successful spawn, which would
have wiped the counter before a timeout could accumulate past threshold
— exactly the loop this fix prevents.
Diagnostics
-----------
* ``_rule_repeated_spawn_failures`` → ``_rule_repeated_failures``. Now
fires regardless of which outcome is at fault. Classifies the most
recent failure (spawn_failed / timed_out / crashed) from the run
history so the title ("Agent timeout x3", "Agent crash x4", "Agent
spawn x5") and suggested action (``doctor`` for spawn, ``log`` for
timeout/crash) stay outcome-specific without N duplicate rules.
* ``_rule_repeated_crashes`` kept as a narrower early-warning at
threshold 2 (vs 3 for the unified rule), but now suppresses itself
when the unified rule would also fire — avoids double-flagging.
* Diagnostic ``data`` payload now carries
``{consecutive_failures, most_recent_outcome, last_error}`` instead
of spawn-specific keys.
CLI
---
* ``Task.consecutive_failures`` / ``Task.last_failure_error`` are the
public fields now. Existing callers that referenced the old names
get migrated (tests updated in this commit).
* Backward-compat: ``DEFAULT_SPAWN_FAILURE_LIMIT``,
``_clear_spawn_failures``, ``_record_spawn_failure`` stay as aliases.
Tests
-----
* 6 new kernel tests: timeout increments counter, 3 consecutive
timeouts trip the breaker (was the reported gap), crash increments
counter, reclaim clears counter, completion clears counter, spawn
success does NOT clear counter.
* Diagnostic tests: updated ``repeated_spawn_failures`` cases to use
the new kind name and add a timeout-loop test.
* Dashboard API test: spawn_failures column update → consecutive_failures.
389/389 kanban-suite tests pass.
Live verification
-----------------
Seeded 4 tasks in an isolated HERMES_HOME: 3 timeouts, 4 crashes,
2-spawn-failed + 2-timed-out, and a task that had prior failures but
completed successfully. Board correctly shows "!! 3 tasks need
attention" (the successful one has no badge because the counter
reset). Drawer for the timeout-loop task renders "Agent timeout x3"
with most_recent_outcome=timed_out and the "Check logs" suggested
action (not the spawn-flavoured "Verify profile"). The successful
task has zero diagnostics.
Closes the Forbidden-Seeds-reported gap.
649 lines
23 KiB
Python
649 lines
23 KiB
Python
"""Kanban diagnostics — structured, actionable distress signals for tasks.
|
|
|
|
A ``Diagnostic`` is a machine-readable description of something that's wrong
|
|
with a kanban task: a hallucinated card id, a spawn crash-loop, a task
|
|
stuck blocked for too long, etc. Each one carries:
|
|
|
|
* A **kind** (canonical code; UI/tests match on this).
|
|
* A **severity** (``warning`` / ``error`` / ``critical``).
|
|
* A **title** (one-line human description) and **detail** (longer text).
|
|
* A list of **suggested actions** — structured entries the dashboard
|
|
turns into buttons and the CLI turns into hints.
|
|
|
|
Rules run over (task, recent events, recent runs) and emit diagnostics.
|
|
They are stateless and read-only — no DB writes. Callers compute
|
|
diagnostics on demand (on ``/board`` load, ``/tasks/:id`` fetch, or
|
|
``hermes kanban diagnostics``).
|
|
|
|
Design goals:
|
|
|
|
* Fixable-on-the-operator's-side signals only (missing config, phantom
|
|
ids, crash loop). Not "the provider returned 502 once" — that's a
|
|
transient runtime blip, not a diagnostic.
|
|
* Recoverable: every diagnostic comes with at least one suggested
|
|
recovery action the operator can actually take from the UI.
|
|
* Auto-clearing: when the underlying failure mode resolves (a clean
|
|
``completed`` event arrives, a spawn succeeds, the task gets
|
|
unblocked), the diagnostic stops firing. The audit event trail stays.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Iterable, Optional
|
|
import json
|
|
import time
|
|
|
|
|
|
# Severity rungs, ordered least → most urgent. The UI colors them
|
|
# amber (warning), orange (error), red (critical). Sorted outputs put
|
|
# critical first so operators see the worst fires at the top.
|
|
SEVERITY_ORDER = ("warning", "error", "critical")
|
|
|
|
|
|
@dataclass
|
|
class DiagnosticAction:
|
|
"""A single recovery action attached to a diagnostic.
|
|
|
|
The ``kind`` determines how both the UI and CLI render it:
|
|
|
|
* ``reclaim`` / ``reassign`` — POST to the matching /tasks/:id/*
|
|
endpoint; dashboard wires into the existing recovery popover.
|
|
* ``unblock`` — PATCH status back to ``ready`` (for stuck-blocked
|
|
diagnostics).
|
|
* ``cli_hint`` — print/copy a shell command (e.g.
|
|
``hermes -p <profile> auth``). No HTTP side effect.
|
|
* ``open_docs`` — deep-link to the docs URL named in ``payload.url``.
|
|
* ``comment`` — nudge the operator to add a comment (for
|
|
stuck-blocked tasks that need human input).
|
|
|
|
``suggested=True`` marks the action as the recommended first step;
|
|
the UI highlights it. Multiple actions can be suggested if they're
|
|
equally valid.
|
|
"""
|
|
|
|
kind: str
|
|
label: str
|
|
payload: dict = field(default_factory=dict)
|
|
suggested: bool = False
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"kind": self.kind,
|
|
"label": self.label,
|
|
"payload": self.payload,
|
|
"suggested": self.suggested,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Diagnostic:
|
|
"""One active distress signal on a task."""
|
|
|
|
kind: str
|
|
severity: str # "warning" | "error" | "critical"
|
|
title: str
|
|
detail: str
|
|
actions: list[DiagnosticAction] = field(default_factory=list)
|
|
first_seen_at: int = 0
|
|
last_seen_at: int = 0
|
|
count: int = 1
|
|
# Optional: the run id this diagnostic is scoped to. None = task-wide.
|
|
run_id: Optional[int] = None
|
|
# Optional structured payload for the UI (phantom ids, failure count).
|
|
data: dict = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"kind": self.kind,
|
|
"severity": self.severity,
|
|
"title": self.title,
|
|
"detail": self.detail,
|
|
"actions": [a.to_dict() for a in self.actions],
|
|
"first_seen_at": self.first_seen_at,
|
|
"last_seen_at": self.last_seen_at,
|
|
"count": self.count,
|
|
"run_id": self.run_id,
|
|
"data": self.data,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rule helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _task_field(task, name, default=None):
|
|
"""Read a field from a task regardless of representation.
|
|
|
|
Callers pass sqlite3.Row (dict-like with [] but no attribute
|
|
access), kanban_db.Task dataclasses (attribute access), or plain
|
|
dicts (both). This normalises them so rule functions don't have
|
|
to branch on type each time.
|
|
"""
|
|
if task is None:
|
|
return default
|
|
# sqlite Row + plain dicts both support mapping access; Row also
|
|
# supports .keys().
|
|
try:
|
|
# Row raises IndexError if the key isn't a column in the query;
|
|
# dicts return default via .get. Handle both.
|
|
if hasattr(task, "keys") and name in task.keys():
|
|
return task[name]
|
|
except Exception:
|
|
pass
|
|
if isinstance(task, dict):
|
|
return task.get(name, default)
|
|
return getattr(task, name, default)
|
|
|
|
|
|
def _parse_payload(ev) -> dict:
|
|
"""Tolerate event.payload being either a dict or a JSON string."""
|
|
p = _task_field(ev, "payload", None)
|
|
if p is None:
|
|
return {}
|
|
if isinstance(p, dict):
|
|
return p
|
|
if isinstance(p, str):
|
|
try:
|
|
return json.loads(p) or {}
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _event_kind(ev) -> str:
|
|
return _task_field(ev, "kind", "") or ""
|
|
|
|
|
|
def _event_ts(ev) -> int:
|
|
t = _task_field(ev, "created_at", 0)
|
|
return int(t or 0)
|
|
|
|
|
|
def _active_hallucination_events(
|
|
events: Iterable[Any],
|
|
kind: str,
|
|
) -> list[Any]:
|
|
"""Return events of ``kind`` that have no ``completed``/``edited``
|
|
event *strictly after* them. Walks chronologically: each clean
|
|
event resets the accumulator; each matching event gets appended.
|
|
|
|
Events must be sorted by id (i.e. arrival order); callers pass the
|
|
task's full event list which the DB already returns in that order.
|
|
"""
|
|
# Events arrive sorted by id asc (chronological). Walk once, track
|
|
# which hallucination events are still "active" (no clean event
|
|
# supersedes them).
|
|
active: list[Any] = []
|
|
for ev in events:
|
|
k = _event_kind(ev)
|
|
if k in ("completed", "edited"):
|
|
active.clear()
|
|
elif k == kind:
|
|
active.append(ev)
|
|
return active
|
|
|
|
|
|
def _latest_clean_event_ts(events: Iterable[Any]) -> int:
|
|
"""Timestamp of the most recent clean completion / edit event.
|
|
|
|
Kept for general "has this task ever been successfully completed"
|
|
lookups; hallucination rules use ``_active_hallucination_events``
|
|
instead because they need strict ordering.
|
|
"""
|
|
latest = 0
|
|
for ev in events:
|
|
if _event_kind(ev) in ("completed", "edited"):
|
|
t = _event_ts(ev)
|
|
if t > latest:
|
|
latest = t
|
|
return latest
|
|
|
|
|
|
# Standard always-available actions. Every diagnostic can offer these as
|
|
# fallbacks regardless of kind — they're the two baseline recovery
|
|
# primitives the kernel supports.
|
|
def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAction]:
|
|
out: list[DiagnosticAction] = []
|
|
if running:
|
|
out.append(DiagnosticAction(
|
|
kind="reclaim",
|
|
label="Reclaim task",
|
|
payload={},
|
|
))
|
|
out.append(DiagnosticAction(
|
|
kind="reassign",
|
|
label="Reassign to different profile",
|
|
payload={"reclaim_first": running},
|
|
))
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rule implementations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Each rule takes (task, events, runs, now_ts, config) and returns
|
|
# zero or more Diagnostic instances. ``events`` / ``runs`` are lists of
|
|
# kanban_db.Event / kanban_db.Run (or plain dicts matching the same
|
|
# shape — for test convenience).
|
|
|
|
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
|
|
|
|
|
|
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|
"""Blocked-hallucination gate fires: a worker called kanban_complete
|
|
with created_cards that didn't exist or weren't created by the
|
|
completing profile. Task stayed in its prior state; the operator
|
|
needs to decide how to proceed.
|
|
|
|
Auto-clears when a successful completion (or edit) follows the
|
|
blocked event.
|
|
"""
|
|
hits = _active_hallucination_events(events, "completion_blocked_hallucination")
|
|
if not hits:
|
|
return []
|
|
phantom_ids: list[str] = []
|
|
first = _event_ts(hits[0])
|
|
last = _event_ts(hits[-1])
|
|
for ev in hits:
|
|
payload = _parse_payload(ev)
|
|
for pid in payload.get("phantom_cards", []) or []:
|
|
if pid not in phantom_ids:
|
|
phantom_ids.append(pid)
|
|
running = _task_field(task, "status") == "running"
|
|
actions: list[DiagnosticAction] = []
|
|
actions.append(DiagnosticAction(
|
|
kind="comment",
|
|
label="Add a comment explaining what to do",
|
|
suggested=False,
|
|
))
|
|
actions.extend(_generic_recovery_actions(task, running=running))
|
|
return [Diagnostic(
|
|
kind="hallucinated_cards",
|
|
severity="error",
|
|
title="Worker claimed cards that don't exist",
|
|
detail=(
|
|
f"The completing worker declared created_cards that either didn't "
|
|
f"exist or weren't created by its profile. The completion was "
|
|
f"blocked and the task stayed in its prior state. "
|
|
f"Usually means the worker hallucinated ids instead of capturing "
|
|
f"return values from kanban_create."
|
|
),
|
|
actions=actions,
|
|
first_seen_at=first,
|
|
last_seen_at=last,
|
|
count=len(hits),
|
|
data={"phantom_ids": phantom_ids},
|
|
)]
|
|
|
|
|
|
def _rule_prose_phantom_refs(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|
"""Advisory prose-scan: the completion summary mentions ``t_<hex>``
|
|
ids that don't resolve. Non-blocking; surfaced as a warning only.
|
|
|
|
Auto-clears when a fresh clean completion arrives AFTER the
|
|
suspected event.
|
|
"""
|
|
hits = _active_hallucination_events(events, "suspected_hallucinated_references")
|
|
if not hits:
|
|
return []
|
|
phantom_refs: list[str] = []
|
|
for ev in hits:
|
|
for pid in _parse_payload(ev).get("phantom_refs", []) or []:
|
|
if pid not in phantom_refs:
|
|
phantom_refs.append(pid)
|
|
running = _task_field(task, "status") == "running"
|
|
return [Diagnostic(
|
|
kind="prose_phantom_refs",
|
|
severity="warning",
|
|
title="Completion summary references unknown task ids",
|
|
detail=(
|
|
"The completion summary mentions task ids that don't resolve "
|
|
"in this board's database. The completion itself succeeded, "
|
|
"but downstream consumers parsing the summary may be pointed "
|
|
"at cards that never existed."
|
|
),
|
|
actions=_generic_recovery_actions(task, running=running),
|
|
first_seen_at=_event_ts(hits[0]),
|
|
last_seen_at=_event_ts(hits[-1]),
|
|
count=len(hits),
|
|
data={"phantom_refs": phantom_refs},
|
|
)]
|
|
|
|
|
|
def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|
"""Task's unified ``consecutive_failures`` counter is climbing —
|
|
something about this task+profile combo is broken and each retry
|
|
fails the same way. Triggers regardless of the specific failure
|
|
mode (spawn error, timeout, crash) because operationally they
|
|
all look the same: the kernel keeps retrying and the operator
|
|
needs to intervene.
|
|
|
|
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3
|
|
is one below the circuit-breaker's default (5), so the diagnostic
|
|
surfaces BEFORE the breaker trips — giving operators a window to
|
|
fix the problem while the dispatcher's still retrying.
|
|
|
|
Accepts the legacy ``spawn_failure_threshold`` config key for
|
|
back-compat.
|
|
"""
|
|
threshold = int(cfg.get(
|
|
"failure_threshold",
|
|
cfg.get("spawn_failure_threshold", 3),
|
|
))
|
|
# Read the new unified counter name, with a fallback to the legacy
|
|
# column name so this rule keeps working against old DB rows the
|
|
# caller somehow materialised without running the migration.
|
|
failures = (
|
|
_task_field(task, "consecutive_failures", None)
|
|
if _task_field(task, "consecutive_failures", None) is not None
|
|
else _task_field(task, "spawn_failures", 0)
|
|
)
|
|
if failures is None or failures < threshold:
|
|
return []
|
|
last_err = (
|
|
_task_field(task, "last_failure_error", None)
|
|
if _task_field(task, "last_failure_error", None) is not None
|
|
else _task_field(task, "last_spawn_error", None)
|
|
)
|
|
assignee = _task_field(task, "assignee")
|
|
|
|
# Classify the most recent failure by peeking at run outcomes so
|
|
# the title + suggested action can be specific without a separate
|
|
# per-outcome rule.
|
|
ordered_runs = sorted(runs, key=lambda r: _task_field(r, "id", 0))
|
|
most_recent_outcome = None
|
|
for r in reversed(ordered_runs):
|
|
oc = _task_field(r, "outcome")
|
|
if oc in ("spawn_failed", "timed_out", "crashed"):
|
|
most_recent_outcome = oc
|
|
break
|
|
|
|
actions: list[DiagnosticAction] = []
|
|
if most_recent_outcome == "spawn_failed" and assignee and assignee != "default":
|
|
# Spawn is failing specifically — profile setup issue.
|
|
actions.append(DiagnosticAction(
|
|
kind="cli_hint",
|
|
label=f"Verify profile: hermes -p {assignee} doctor",
|
|
payload={"command": f"hermes -p {assignee} doctor"},
|
|
suggested=True,
|
|
))
|
|
actions.append(DiagnosticAction(
|
|
kind="cli_hint",
|
|
label=f"Fix profile auth: hermes -p {assignee} auth",
|
|
payload={"command": f"hermes -p {assignee} auth"},
|
|
))
|
|
elif most_recent_outcome in ("timed_out", "crashed"):
|
|
# Worker got off the ground but died. Logs are the right place
|
|
# to diagnose; reclaim/reassign are the recovery levers.
|
|
task_id = _task_field(task, "id")
|
|
if task_id:
|
|
actions.append(DiagnosticAction(
|
|
kind="cli_hint",
|
|
label=f"Check logs: hermes kanban log {task_id}",
|
|
payload={"command": f"hermes kanban log {task_id}"},
|
|
suggested=True,
|
|
))
|
|
actions.extend(_generic_recovery_actions(
|
|
task, running=_task_field(task, "status") == "running",
|
|
))
|
|
|
|
severity = "critical" if failures >= threshold * 2 else "error"
|
|
err_text = (last_err or "").strip() if last_err else ""
|
|
err_snippet = err_text[:500] + ("…" if len(err_text) > 500 else "") if err_text else ""
|
|
outcome_label = {
|
|
"spawn_failed": "spawn",
|
|
"timed_out": "timeout",
|
|
"crashed": "crash",
|
|
}.get(most_recent_outcome or "", "failure")
|
|
if err_snippet:
|
|
title = f"Agent {outcome_label} x{failures}: {err_snippet.splitlines()[0][:160]}"
|
|
detail = (
|
|
f"This task has failed {failures} times in a row "
|
|
f"(most recent: {outcome_label}). Full last error:\n\n"
|
|
f"{err_snippet}\n\n"
|
|
f"The dispatcher will keep retrying until the consecutive-"
|
|
f"failures counter trips the circuit breaker (default 5), "
|
|
f"at which point the task auto-blocks. Fix the root cause "
|
|
f"and reclaim to retry."
|
|
)
|
|
else:
|
|
title = f"Agent {outcome_label} x{failures} (no error recorded)"
|
|
detail = (
|
|
f"This task has failed {failures} times in a row "
|
|
f"(most recent: {outcome_label}) but no error text was "
|
|
f"captured. Check the suggested command or the worker log."
|
|
)
|
|
return [Diagnostic(
|
|
kind="repeated_failures",
|
|
severity=severity,
|
|
title=title,
|
|
detail=detail,
|
|
actions=actions,
|
|
first_seen_at=now,
|
|
last_seen_at=now,
|
|
count=failures,
|
|
data={
|
|
"consecutive_failures": failures,
|
|
"most_recent_outcome": most_recent_outcome,
|
|
"last_error": last_err,
|
|
},
|
|
)]
|
|
|
|
|
|
def _rule_repeated_crashes(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|
"""The worker spawns fine but keeps crashing mid-run. Check the last
|
|
N runs' outcomes; N consecutive ``crashed`` without a successful
|
|
``completed`` means something about the task + profile combo is
|
|
broken (OOM, missing dependency, tool it needs is down).
|
|
|
|
Threshold: cfg["crash_threshold"] (default 2).
|
|
|
|
Narrower than ``repeated_failures`` — fires earlier (2 crashes vs 3
|
|
total failures) so the operator gets a crash-specific heads-up
|
|
before the unified rule kicks in. Suppresses itself when the
|
|
unified rule is also about to fire, to avoid double-flagging.
|
|
"""
|
|
failure_threshold = int(cfg.get(
|
|
"failure_threshold",
|
|
cfg.get("spawn_failure_threshold", 3),
|
|
))
|
|
unified_counter = (
|
|
_task_field(task, "consecutive_failures", 0) or 0
|
|
)
|
|
# Unified rule will catch this — let it handle to avoid double fire.
|
|
if unified_counter >= failure_threshold:
|
|
return []
|
|
|
|
threshold = int(cfg.get("crash_threshold", 2))
|
|
ordered = sorted(runs, key=lambda r: _task_field(r, "id", 0))
|
|
# Count trailing consecutive 'crashed' outcomes.
|
|
consecutive = 0
|
|
last_err = None
|
|
for r in reversed(ordered):
|
|
outcome = _task_field(r, "outcome")
|
|
if outcome == "crashed":
|
|
consecutive += 1
|
|
if last_err is None:
|
|
last_err = _task_field(r, "error")
|
|
elif outcome in ("completed", "reclaimed"):
|
|
# A success (or manual reclaim) breaks the streak.
|
|
break
|
|
else:
|
|
# Other outcomes (timed_out, blocked, spawn_failed, gave_up)
|
|
# aren't crash signals — don't count them, but they also
|
|
# don't break the crash streak.
|
|
continue
|
|
if consecutive < threshold:
|
|
return []
|
|
task_id = _task_field(task, "id")
|
|
actions: list[DiagnosticAction] = []
|
|
if task_id:
|
|
actions.append(DiagnosticAction(
|
|
kind="cli_hint",
|
|
label=f"Check logs: hermes kanban log {task_id}",
|
|
payload={"command": f"hermes kanban log {task_id}"},
|
|
suggested=True,
|
|
))
|
|
running = _task_field(task, "status") == "running"
|
|
actions.extend(_generic_recovery_actions(task, running=running))
|
|
severity = "critical" if consecutive >= threshold * 2 else "error"
|
|
# Put the actual error up-front so operators see WHAT broke without
|
|
# having to open the logs. Truncate defensively — these can be huge
|
|
# (full tracebacks).
|
|
err_text = (last_err or "").strip() if last_err else ""
|
|
err_snippet = err_text[:500] + ("…" if len(err_text) > 500 else "") if err_text else ""
|
|
if err_snippet:
|
|
title = f"Agent crashed {consecutive}x: {err_snippet.splitlines()[0][:160]}"
|
|
detail = (
|
|
f"The last {consecutive} runs ended with outcome=crashed. "
|
|
f"Full last error:\n\n{err_snippet}"
|
|
)
|
|
else:
|
|
title = f"Agent crashed {consecutive}x (no error recorded)"
|
|
detail = (
|
|
f"The last {consecutive} runs ended with outcome=crashed but "
|
|
f"no error text was captured. Check the worker log for more."
|
|
)
|
|
return [Diagnostic(
|
|
kind="repeated_crashes",
|
|
severity=severity,
|
|
title=title,
|
|
detail=detail,
|
|
actions=actions,
|
|
first_seen_at=now,
|
|
last_seen_at=now,
|
|
count=consecutive,
|
|
data={"consecutive_crashes": consecutive, "last_error": last_err},
|
|
)]
|
|
|
|
|
|
def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|
"""Task has been in ``blocked`` status for too long without a comment.
|
|
|
|
Threshold: cfg["blocked_stale_hours"] (default 24).
|
|
Surfaced as a warning so humans know there's a pending unblock.
|
|
"""
|
|
hours = float(cfg.get("blocked_stale_hours", 24))
|
|
status = _task_field(task, "status")
|
|
if status != "blocked":
|
|
return []
|
|
# Find the most recent ``blocked`` event.
|
|
last_blocked_ts = 0
|
|
for ev in events:
|
|
if _event_kind(ev) == "blocked":
|
|
t = _event_ts(ev)
|
|
if t > last_blocked_ts:
|
|
last_blocked_ts = t
|
|
if last_blocked_ts == 0:
|
|
return []
|
|
age_hours = (now - last_blocked_ts) / 3600.0
|
|
if age_hours < hours:
|
|
return []
|
|
# Any comment / unblock after the block breaks the "stale" signal.
|
|
for ev in events:
|
|
if _event_kind(ev) in ("commented", "unblocked") and _event_ts(ev) > last_blocked_ts:
|
|
return []
|
|
actions: list[DiagnosticAction] = [
|
|
DiagnosticAction(
|
|
kind="comment",
|
|
label="Add a comment / unblock the task",
|
|
suggested=True,
|
|
),
|
|
]
|
|
return [Diagnostic(
|
|
kind="stuck_in_blocked",
|
|
severity="warning",
|
|
title=f"Task has been blocked for {int(age_hours)}h",
|
|
detail=(
|
|
f"This task transitioned to blocked {int(age_hours)}h ago and "
|
|
f"has had no comments or unblock attempts since. Blocked tasks "
|
|
f"are waiting for human input — check the block reason and "
|
|
f"either unblock with feedback or answer with a comment."
|
|
),
|
|
actions=actions,
|
|
first_seen_at=last_blocked_ts,
|
|
last_seen_at=last_blocked_ts,
|
|
count=1,
|
|
data={"blocked_at": last_blocked_ts, "age_hours": round(age_hours, 1)},
|
|
)]
|
|
|
|
|
|
# Registry — order matters: rules higher on the list render first when
|
|
# severity ties. Add new rules here.
|
|
_RULES: list[RuleFn] = [
|
|
_rule_hallucinated_cards,
|
|
_rule_prose_phantom_refs,
|
|
_rule_repeated_failures,
|
|
_rule_repeated_crashes,
|
|
_rule_stuck_in_blocked,
|
|
]
|
|
|
|
|
|
# Known kinds (for the UI's filter / legend / i18n keys). Update when
|
|
# rules are added.
|
|
DIAGNOSTIC_KINDS = (
|
|
"hallucinated_cards",
|
|
"prose_phantom_refs",
|
|
"repeated_failures",
|
|
"repeated_crashes",
|
|
"stuck_in_blocked",
|
|
)
|
|
|
|
|
|
DEFAULT_CONFIG = {
|
|
"failure_threshold": 3,
|
|
# Legacy alias accepted at read time by _rule_repeated_failures.
|
|
"spawn_failure_threshold": 3,
|
|
"crash_threshold": 2,
|
|
"blocked_stale_hours": 24,
|
|
}
|
|
|
|
|
|
def compute_task_diagnostics(
|
|
task,
|
|
events: list,
|
|
runs: list,
|
|
*,
|
|
now: Optional[int] = None,
|
|
config: Optional[dict] = None,
|
|
) -> list[Diagnostic]:
|
|
"""Run every rule against a single task's state and return a
|
|
severity-sorted list of active diagnostics.
|
|
|
|
Sorting: critical first, then error, then warning; ties broken by
|
|
most-recent ``last_seen_at``.
|
|
"""
|
|
now_ts = int(now if now is not None else time.time())
|
|
cfg = {**DEFAULT_CONFIG, **(config or {})}
|
|
out: list[Diagnostic] = []
|
|
for rule in _RULES:
|
|
try:
|
|
out.extend(rule(task, events, runs, now_ts, cfg))
|
|
except Exception:
|
|
# A broken rule must never crash the dashboard. Rule bugs
|
|
# get caught in tests; in production we'd rather drop the
|
|
# diagnostic than 500 a whole /board request.
|
|
continue
|
|
severity_idx = {s: i for i, s in enumerate(SEVERITY_ORDER)}
|
|
out.sort(
|
|
key=lambda d: (
|
|
-severity_idx.get(d.severity, -1),
|
|
-(d.last_seen_at or 0),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def severity_of_highest(diagnostics: Iterable[Diagnostic]) -> Optional[str]:
|
|
"""Highest severity present in the list, or None if empty. Useful
|
|
for card badges that need a single color."""
|
|
highest_idx = -1
|
|
highest = None
|
|
for d in diagnostics:
|
|
idx = SEVERITY_ORDER.index(d.severity) if d.severity in SEVERITY_ORDER else -1
|
|
if idx > highest_idx:
|
|
highest_idx = idx
|
|
highest = d.severity
|
|
return highest
|