hermes-agent/hermes_cli/kanban_diagnostics.py
Teknium f67063ba81
feat(kanban): generic diagnostics engine for task distress signals (#20332)
* feat(kanban): generic diagnostics engine for task distress signals

Replaces the hallucination-specific ``warnings`` / ``RecoverySection``
surface (shipped in PR #20232) with a reusable diagnostic-rule engine
that covers five distress kinds in v1 and can be extended without
touching UI code. The "something's wrong with this task" signal is
no longer limited to phantom card ids.

Closes the follow-up from #20232 discussion.

New module
----------
``hermes_cli/kanban_diagnostics.py`` — stateless, no-side-effect rule
engine. Each rule is a pure function of
``(task, events, runs, now, config) -> list[Diagnostic]``. Registry
is a simple list; adding a new distress kind is one function + one
import, no UI or API changes required.

v1 rule set
-----------
* ``hallucinated_cards`` (error) — folds the existing
  ``completion_blocked_hallucination`` event into the new surface.
* ``prose_phantom_refs`` (warning) — folds
  ``suspected_hallucinated_references``.
* ``repeated_spawn_failures`` (error → critical at 2x threshold) —
  fires when ``tasks.spawn_failures >= 3``; suggests
  ``hermes -p <profile> doctor`` / ``auth``.
* ``repeated_crashes`` (error → critical) — fires after N consecutive
  ``crashed`` run outcomes with no successful completion between;
  suggests ``hermes kanban log <id>``.
* ``stuck_in_blocked`` (warning) — fires after 24h in ``blocked``
  state with no comments / unblock attempts; suggests commenting.

Every diagnostic carries structured ``actions`` (reclaim, reassign,
unblock, cli_hint, comment, open_docs) that render consistently in
both CLI and dashboard. Suggested actions are highlighted; generic
recovery actions (reclaim / reassign) are available on every kind as
fallbacks.

Diagnostics auto-clear when the underlying failure resolves — a
clean ``completed``/``edited`` event drops hallucination diagnostics,
a successful run drops crash diagnostics, a comment drops
stuck-blocked diagnostics. Audit events persist; the badge goes away.

API
---
``plugin_api.py``:
* ``/board`` now attaches ``diagnostics`` (full list) and
  ``warnings`` (compact summary with ``highest_severity``) per task.
* ``/tasks/{id}`` attaches diagnostics so the drawer's Diagnostics
  section auto-opens on flagged tasks.
* NEW ``/diagnostics`` endpoint — fleet-wide listing, filterable by
  severity, sorted critical-first.

CLI
---
* NEW ``hermes kanban diagnostics [--severity X] [--task id]
  [--json]`` — fleet view or single-task view, matches dashboard rule
  output so CLI users see the same picture.
* ``hermes kanban show <id>`` now renders a Diagnostics section near
  the top with severity markers + suggested actions.

Dashboard
---------
* Card badge is severity-coloured (⚠ amber warning, !! orange error,
  !!! red critical) using ``warnings.highest_severity``.
* Attention strip above the toolbar counts EVERY task with active
  diagnostics (not just hallucinations), severity-coloured, lists
  affected tasks with Open buttons when expanded.
* Drawer's old ``RecoverySection`` replaced with generic
  ``DiagnosticsSection`` rendering a card per active diagnostic:
  title + detail + structured data (task-id chips when payload keys
  look like id lists) + action buttons. Reassign profile picker is
  inline per-diagnostic. Clipboard fallback uses ``.catch()`` for
  environments where writeText rejects.
* Three-rung severity palette; amber for warning, orange for error,
  red for critical. Uses CSS variables so theming is straightforward.

Tests
-----
* NEW ``tests/hermes_cli/test_kanban_diagnostics.py`` — 14 unit tests
  covering each rule's positive/negative/threshold paths, severity
  sorting, broken-rule isolation, and sqlite3.Row integration.
* Dashboard plugin tests extended: ``/diagnostics`` endpoint (empty,
  populated, severity-filtered), ``/board`` exposes both diagnostic
  list and compact summary with ``highest_severity``.
* Existing hallucination-specific test (``test_board_surfaces_
  warnings_field_for_hallucinated_completions``) updated to reflect
  the new contract: warning summary keys by diagnostic kind
  (``hallucinated_cards``) not event kind.

379 kanban-suite tests pass (+16 net from this PR).

Live verification
-----------------
Seeded all 5 diagnostic kinds + one clean + one plain-running task
(7 total) into an isolated HERMES_HOME, spun up the dashboard, and
verified:

* Attention strip: shows ``!! 5 tasks need attention`` in the
  error-severity orange; Show expands to a list of 5 rows ordered
  critical > error > warning.
* Card badges: error tasks render ``!!`` orange, warning tasks
  render ``⚠`` amber, clean and plain-running tasks render no badge.
* Each of the 5 rules opens a correctly-coloured, correctly-styled
  diagnostic card in the drawer with its specific suggested action.
* Live reassign from a diagnostic card flipped
  ``broken-ml-worker → alice`` and the drawer refreshed with the
  new assignee + the same diagnostic still firing (correct:
  spawn_failures counter hasn't reset yet).
* CLI ``hermes kanban diagnostics`` prints all 5 in severity order;
  ``--severity error`` narrows to 3; ``kanban show <id>`` includes
  the Diagnostics block at the top with suggested action hint.

Migration note
--------------
The old ``warnings`` shape (``{count, kinds, latest_at}``) is
preserved on the API but ``kinds`` now keys by diagnostic kind
(``hallucinated_cards``) instead of event kind
(``completion_blocked_hallucination``). ``highest_severity`` is a
new required field. The dashboard was the only consumer and has
been updated in the same commit; external API consumers of the
``warnings`` field will need to update their kind-match logic.

* feat(kanban/diagnostics): lead titles with the actual error text

The generic 'Worker crashed N runs in a row' / 'Worker failed to spawn
N times' titles buried the actual cause in the data section. Operators
had to open logs or expand the diagnostic to see WHY the worker is
stuck — rate-limit vs insufficient quota vs bad auth vs context
overflow vs network blip all looked identical at a glance.

New titles:

  Agent crashed 3x: openai: 429 Too Many Requests - rate limit reached
  Agent crashed 3x: anthropic: 402 insufficient_quota - credit balance
  Agent crashed 3x: provider auth error: 401 Unauthorized
  Agent spawn failed 4x: insufficient_quota: You exceeded your current

Detail keeps the full error snippet (capped at 500 chars + ellipsis
for tracebacks). Title takes the first line capped at 160 chars.
Fallback title if no error recorded stays honest ('no error recorded').

Tests: 4 new cases covering 429/billing/spawn/truncation. 383 total
pass (+4).

Live-verified on dashboard with 6 seeded scenarios
(rate-limit, billing, auth, context, network, spawn-billing) —
each card title leads with the actionable error text.
2026-05-05 13:32:42 -07:00

570 lines
20 KiB
Python

"""Kanban diagnostics — structured, actionable distress signals for tasks.
A ``Diagnostic`` is a machine-readable description of something that's wrong
with a kanban task: a hallucinated card id, a spawn crash-loop, a task
stuck blocked for too long, etc. Each one carries:
* A **kind** (canonical code; UI/tests match on this).
* A **severity** (``warning`` / ``error`` / ``critical``).
* A **title** (one-line human description) and **detail** (longer text).
* A list of **suggested actions** — structured entries the dashboard
turns into buttons and the CLI turns into hints.
Rules run over (task, recent events, recent runs) and emit diagnostics.
They are stateless and read-only — no DB writes. Callers compute
diagnostics on demand (on ``/board`` load, ``/tasks/:id`` fetch, or
``hermes kanban diagnostics``).
Design goals:
* Fixable-on-the-operator's-side signals only (missing config, phantom
ids, crash loop). Not "the provider returned 502 once" — that's a
transient runtime blip, not a diagnostic.
* Recoverable: every diagnostic comes with at least one suggested
recovery action the operator can actually take from the UI.
* Auto-clearing: when the underlying failure mode resolves (a clean
``completed`` event arrives, a spawn succeeds, the task gets
unblocked), the diagnostic stops firing. The audit event trail stays.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Iterable, Optional
import json
import time
# Severity rungs, ordered least → most urgent. The UI colors them
# amber (warning), orange (error), red (critical). Sorted outputs put
# critical first so operators see the worst fires at the top.
SEVERITY_ORDER = ("warning", "error", "critical")
@dataclass
class DiagnosticAction:
"""A single recovery action attached to a diagnostic.
The ``kind`` determines how both the UI and CLI render it:
* ``reclaim`` / ``reassign`` — POST to the matching /tasks/:id/*
endpoint; dashboard wires into the existing recovery popover.
* ``unblock`` — PATCH status back to ``ready`` (for stuck-blocked
diagnostics).
* ``cli_hint`` — print/copy a shell command (e.g.
``hermes -p <profile> auth``). No HTTP side effect.
* ``open_docs`` — deep-link to the docs URL named in ``payload.url``.
* ``comment`` — nudge the operator to add a comment (for
stuck-blocked tasks that need human input).
``suggested=True`` marks the action as the recommended first step;
the UI highlights it. Multiple actions can be suggested if they're
equally valid.
"""
kind: str
label: str
payload: dict = field(default_factory=dict)
suggested: bool = False
def to_dict(self) -> dict:
return {
"kind": self.kind,
"label": self.label,
"payload": self.payload,
"suggested": self.suggested,
}
@dataclass
class Diagnostic:
"""One active distress signal on a task."""
kind: str
severity: str # "warning" | "error" | "critical"
title: str
detail: str
actions: list[DiagnosticAction] = field(default_factory=list)
first_seen_at: int = 0
last_seen_at: int = 0
count: int = 1
# Optional: the run id this diagnostic is scoped to. None = task-wide.
run_id: Optional[int] = None
# Optional structured payload for the UI (phantom ids, failure count).
data: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"kind": self.kind,
"severity": self.severity,
"title": self.title,
"detail": self.detail,
"actions": [a.to_dict() for a in self.actions],
"first_seen_at": self.first_seen_at,
"last_seen_at": self.last_seen_at,
"count": self.count,
"run_id": self.run_id,
"data": self.data,
}
# ---------------------------------------------------------------------------
# Rule helpers
# ---------------------------------------------------------------------------
def _task_field(task, name, default=None):
"""Read a field from a task regardless of representation.
Callers pass sqlite3.Row (dict-like with [] but no attribute
access), kanban_db.Task dataclasses (attribute access), or plain
dicts (both). This normalises them so rule functions don't have
to branch on type each time.
"""
if task is None:
return default
# sqlite Row + plain dicts both support mapping access; Row also
# supports .keys().
try:
# Row raises IndexError if the key isn't a column in the query;
# dicts return default via .get. Handle both.
if hasattr(task, "keys") and name in task.keys():
return task[name]
except Exception:
pass
if isinstance(task, dict):
return task.get(name, default)
return getattr(task, name, default)
def _parse_payload(ev) -> dict:
"""Tolerate event.payload being either a dict or a JSON string."""
p = _task_field(ev, "payload", None)
if p is None:
return {}
if isinstance(p, dict):
return p
if isinstance(p, str):
try:
return json.loads(p) or {}
except Exception:
return {}
return {}
def _event_kind(ev) -> str:
return _task_field(ev, "kind", "") or ""
def _event_ts(ev) -> int:
t = _task_field(ev, "created_at", 0)
return int(t or 0)
def _active_hallucination_events(
events: Iterable[Any],
kind: str,
) -> list[Any]:
"""Return events of ``kind`` that have no ``completed``/``edited``
event *strictly after* them. Walks chronologically: each clean
event resets the accumulator; each matching event gets appended.
Events must be sorted by id (i.e. arrival order); callers pass the
task's full event list which the DB already returns in that order.
"""
# Events arrive sorted by id asc (chronological). Walk once, track
# which hallucination events are still "active" (no clean event
# supersedes them).
active: list[Any] = []
for ev in events:
k = _event_kind(ev)
if k in ("completed", "edited"):
active.clear()
elif k == kind:
active.append(ev)
return active
def _latest_clean_event_ts(events: Iterable[Any]) -> int:
"""Timestamp of the most recent clean completion / edit event.
Kept for general "has this task ever been successfully completed"
lookups; hallucination rules use ``_active_hallucination_events``
instead because they need strict ordering.
"""
latest = 0
for ev in events:
if _event_kind(ev) in ("completed", "edited"):
t = _event_ts(ev)
if t > latest:
latest = t
return latest
# Standard always-available actions. Every diagnostic can offer these as
# fallbacks regardless of kind — they're the two baseline recovery
# primitives the kernel supports.
def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAction]:
out: list[DiagnosticAction] = []
if running:
out.append(DiagnosticAction(
kind="reclaim",
label="Reclaim task",
payload={},
))
out.append(DiagnosticAction(
kind="reassign",
label="Reassign to different profile",
payload={"reclaim_first": running},
))
return out
# ---------------------------------------------------------------------------
# Rule implementations
# ---------------------------------------------------------------------------
# Each rule takes (task, events, runs, now_ts, config) and returns
# zero or more Diagnostic instances. ``events`` / ``runs`` are lists of
# kanban_db.Event / kanban_db.Run (or plain dicts matching the same
# shape — for test convenience).
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Blocked-hallucination gate fires: a worker called kanban_complete
with created_cards that didn't exist or weren't created by the
completing profile. Task stayed in its prior state; the operator
needs to decide how to proceed.
Auto-clears when a successful completion (or edit) follows the
blocked event.
"""
hits = _active_hallucination_events(events, "completion_blocked_hallucination")
if not hits:
return []
phantom_ids: list[str] = []
first = _event_ts(hits[0])
last = _event_ts(hits[-1])
for ev in hits:
payload = _parse_payload(ev)
for pid in payload.get("phantom_cards", []) or []:
if pid not in phantom_ids:
phantom_ids.append(pid)
running = _task_field(task, "status") == "running"
actions: list[DiagnosticAction] = []
actions.append(DiagnosticAction(
kind="comment",
label="Add a comment explaining what to do",
suggested=False,
))
actions.extend(_generic_recovery_actions(task, running=running))
return [Diagnostic(
kind="hallucinated_cards",
severity="error",
title="Worker claimed cards that don't exist",
detail=(
f"The completing worker declared created_cards that either didn't "
f"exist or weren't created by its profile. The completion was "
f"blocked and the task stayed in its prior state. "
f"Usually means the worker hallucinated ids instead of capturing "
f"return values from kanban_create."
),
actions=actions,
first_seen_at=first,
last_seen_at=last,
count=len(hits),
data={"phantom_ids": phantom_ids},
)]
def _rule_prose_phantom_refs(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Advisory prose-scan: the completion summary mentions ``t_<hex>``
ids that don't resolve. Non-blocking; surfaced as a warning only.
Auto-clears when a fresh clean completion arrives AFTER the
suspected event.
"""
hits = _active_hallucination_events(events, "suspected_hallucinated_references")
if not hits:
return []
phantom_refs: list[str] = []
for ev in hits:
for pid in _parse_payload(ev).get("phantom_refs", []) or []:
if pid not in phantom_refs:
phantom_refs.append(pid)
running = _task_field(task, "status") == "running"
return [Diagnostic(
kind="prose_phantom_refs",
severity="warning",
title="Completion summary references unknown task ids",
detail=(
"The completion summary mentions task ids that don't resolve "
"in this board's database. The completion itself succeeded, "
"but downstream consumers parsing the summary may be pointed "
"at cards that never existed."
),
actions=_generic_recovery_actions(task, running=running),
first_seen_at=_event_ts(hits[0]),
last_seen_at=_event_ts(hits[-1]),
count=len(hits),
data={"phantom_refs": phantom_refs},
)]
def _rule_repeated_spawn_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Task's ``spawn_failures`` counter is climbing — worker can't
even start. Usually a profile misconfiguration (missing config.yaml,
bad PATH/venv, wrong credentials).
Threshold: cfg["spawn_failure_threshold"] (default 3).
"""
threshold = int(cfg.get("spawn_failure_threshold", 3))
failures = _task_field(task, "spawn_failures", 0)
if failures is None or failures < threshold:
return []
last_err = _task_field(task, "last_spawn_error")
assignee = _task_field(task, "assignee")
actions: list[DiagnosticAction] = []
if assignee and assignee != "default":
actions.append(DiagnosticAction(
kind="cli_hint",
label=f"Verify profile: hermes -p {assignee} doctor",
payload={"command": f"hermes -p {assignee} doctor"},
suggested=True,
))
actions.append(DiagnosticAction(
kind="cli_hint",
label=f"Fix profile auth: hermes -p {assignee} auth",
payload={"command": f"hermes -p {assignee} auth"},
))
actions.extend(_generic_recovery_actions(task, running=False))
severity = "critical" if failures >= threshold * 2 else "error"
err_text = (last_err or "").strip() if last_err else ""
err_snippet = err_text[:500] + ("" if len(err_text) > 500 else "") if err_text else ""
if err_snippet:
title = f"Agent spawn failed {failures}x: {err_snippet.splitlines()[0][:160]}"
detail = (
f"The dispatcher tried to launch a worker {failures} times "
f"and failed every time. Full last error:\n\n{err_snippet}\n\n"
f"Common causes: missing config.yaml, bad venv/PATH, or "
f"missing credentials for the profile's configured provider."
)
else:
title = f"Agent spawn failed {failures}x (no error recorded)"
detail = (
f"The dispatcher tried to launch a worker {failures} times "
f"and failed every time, but no error text was captured. "
f"Usually a profile configuration issue — check profile "
f"health with the suggested command."
)
return [Diagnostic(
kind="repeated_spawn_failures",
severity=severity,
title=title,
detail=detail,
actions=actions,
first_seen_at=now,
last_seen_at=now,
count=failures,
data={"spawn_failures": failures, "last_spawn_error": last_err},
)]
def _rule_repeated_crashes(task, events, runs, now, cfg) -> list[Diagnostic]:
"""The worker spawns fine but keeps crashing mid-run. Check the last
N runs' outcomes; N consecutive ``crashed`` without a successful
``completed`` means something about the task + profile combo is
broken (OOM, missing dependency, tool it needs is down).
Threshold: cfg["crash_threshold"] (default 2).
"""
threshold = int(cfg.get("crash_threshold", 2))
ordered = sorted(runs, key=lambda r: _task_field(r, "id", 0))
# Count trailing consecutive 'crashed' outcomes.
consecutive = 0
last_err = None
for r in reversed(ordered):
outcome = _task_field(r, "outcome")
if outcome == "crashed":
consecutive += 1
if last_err is None:
last_err = _task_field(r, "error")
elif outcome in ("completed", "reclaimed"):
# A success (or manual reclaim) breaks the streak.
break
else:
# Other outcomes (timed_out, blocked, spawn_failed, gave_up)
# aren't crash signals — don't count them, but they also
# don't break the crash streak.
continue
if consecutive < threshold:
return []
task_id = _task_field(task, "id")
actions: list[DiagnosticAction] = []
if task_id:
actions.append(DiagnosticAction(
kind="cli_hint",
label=f"Check logs: hermes kanban log {task_id}",
payload={"command": f"hermes kanban log {task_id}"},
suggested=True,
))
running = _task_field(task, "status") == "running"
actions.extend(_generic_recovery_actions(task, running=running))
severity = "critical" if consecutive >= threshold * 2 else "error"
# Put the actual error up-front so operators see WHAT broke without
# having to open the logs. Truncate defensively — these can be huge
# (full tracebacks).
err_text = (last_err or "").strip() if last_err else ""
err_snippet = err_text[:500] + ("" if len(err_text) > 500 else "") if err_text else ""
if err_snippet:
title = f"Agent crashed {consecutive}x: {err_snippet.splitlines()[0][:160]}"
detail = (
f"The last {consecutive} runs ended with outcome=crashed. "
f"Full last error:\n\n{err_snippet}"
)
else:
title = f"Agent crashed {consecutive}x (no error recorded)"
detail = (
f"The last {consecutive} runs ended with outcome=crashed but "
f"no error text was captured. Check the worker log for more."
)
return [Diagnostic(
kind="repeated_crashes",
severity=severity,
title=title,
detail=detail,
actions=actions,
first_seen_at=now,
last_seen_at=now,
count=consecutive,
data={"consecutive_crashes": consecutive, "last_error": last_err},
)]
def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Task has been in ``blocked`` status for too long without a comment.
Threshold: cfg["blocked_stale_hours"] (default 24).
Surfaced as a warning so humans know there's a pending unblock.
"""
hours = float(cfg.get("blocked_stale_hours", 24))
status = _task_field(task, "status")
if status != "blocked":
return []
# Find the most recent ``blocked`` event.
last_blocked_ts = 0
for ev in events:
if _event_kind(ev) == "blocked":
t = _event_ts(ev)
if t > last_blocked_ts:
last_blocked_ts = t
if last_blocked_ts == 0:
return []
age_hours = (now - last_blocked_ts) / 3600.0
if age_hours < hours:
return []
# Any comment / unblock after the block breaks the "stale" signal.
for ev in events:
if _event_kind(ev) in ("commented", "unblocked") and _event_ts(ev) > last_blocked_ts:
return []
actions: list[DiagnosticAction] = [
DiagnosticAction(
kind="comment",
label="Add a comment / unblock the task",
suggested=True,
),
]
return [Diagnostic(
kind="stuck_in_blocked",
severity="warning",
title=f"Task has been blocked for {int(age_hours)}h",
detail=(
f"This task transitioned to blocked {int(age_hours)}h ago and "
f"has had no comments or unblock attempts since. Blocked tasks "
f"are waiting for human input — check the block reason and "
f"either unblock with feedback or answer with a comment."
),
actions=actions,
first_seen_at=last_blocked_ts,
last_seen_at=last_blocked_ts,
count=1,
data={"blocked_at": last_blocked_ts, "age_hours": round(age_hours, 1)},
)]
# Registry — order matters: rules higher on the list render first when
# severity ties. Add new rules here.
_RULES: list[RuleFn] = [
_rule_hallucinated_cards,
_rule_prose_phantom_refs,
_rule_repeated_spawn_failures,
_rule_repeated_crashes,
_rule_stuck_in_blocked,
]
# Known kinds (for the UI's filter / legend / i18n keys). Update when
# rules are added.
DIAGNOSTIC_KINDS = (
"hallucinated_cards",
"prose_phantom_refs",
"repeated_spawn_failures",
"repeated_crashes",
"stuck_in_blocked",
)
DEFAULT_CONFIG = {
"spawn_failure_threshold": 3,
"crash_threshold": 2,
"blocked_stale_hours": 24,
}
def compute_task_diagnostics(
task,
events: list,
runs: list,
*,
now: Optional[int] = None,
config: Optional[dict] = None,
) -> list[Diagnostic]:
"""Run every rule against a single task's state and return a
severity-sorted list of active diagnostics.
Sorting: critical first, then error, then warning; ties broken by
most-recent ``last_seen_at``.
"""
now_ts = int(now if now is not None else time.time())
cfg = {**DEFAULT_CONFIG, **(config or {})}
out: list[Diagnostic] = []
for rule in _RULES:
try:
out.extend(rule(task, events, runs, now_ts, cfg))
except Exception:
# A broken rule must never crash the dashboard. Rule bugs
# get caught in tests; in production we'd rather drop the
# diagnostic than 500 a whole /board request.
continue
severity_idx = {s: i for i, s in enumerate(SEVERITY_ORDER)}
out.sort(
key=lambda d: (
-severity_idx.get(d.severity, -1),
-(d.last_seen_at or 0),
)
)
return out
def severity_of_highest(diagnostics: Iterable[Diagnostic]) -> Optional[str]:
"""Highest severity present in the list, or None if empty. Useful
for card badges that need a single color."""
highest_idx = -1
highest = None
for d in diagnostics:
idx = SEVERITY_ORDER.index(d.severity) if d.severity in SEVERITY_ORDER else -1
if idx > highest_idx:
highest_idx = idx
highest = d.severity
return highest