From 3b122cc1ac3ae91e690ec8b29e54af10e52fdb18 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 10 May 2026 21:58:44 -0700 Subject: [PATCH] feat(kanban): stranded_in_ready diagnostic for unclaimed tasks (#23578) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surface ready tasks that nobody claims within a threshold (default 30 min) regardless of why. One identity-agnostic signal that catches: - Operator typo'd the assignee - Profile was deleted, leaving its tasks stranded - External worker pool (Codex CLI lane, custom daemon) is down - Dispatcher misconfigured (wrong board / wrong HERMES_HOME) Today the dispatcher correctly skips these (no respawn loop, good) but nothing surfaces the fact that operator-actionable work is accumulating. The new `stranded_in_ready` rule does that without requiring a manual lane registry — it reads the most recent ready- transition event (`created` / `promoted` / `reclaimed` / `unblocked`) and fires when (now - last_ready_ts) > threshold. Severity escalates with age: warning at threshold, error at 2x, critical at 6x. The cli_hint and reassign actions point operators at the right next step. Out of scope deliberately: - Lane registry (#20157 closed) — this signal supersedes it. - Pushing the diagnostic into messaging gateways — diagnostics are pull-only via 'hermes kanban diagnostics' for now; gateway push is a separate UX decision. Tests: 10 new + 461 existing kanban tests pass. E2E verified end- to-end via 'hermes kanban diagnostics --json' against a 2h-old stranded task — surfaces as error severity with correct actions. --- hermes_cli/kanban_diagnostics.py | 130 +++++++++++++ tests/hermes_cli/test_kanban_diagnostics.py | 184 +++++++++++++++++- .../features/kanban-worker-lanes.md | 1 + 3 files changed, 311 insertions(+), 4 deletions(-) diff --git a/hermes_cli/kanban_diagnostics.py b/hermes_cli/kanban_diagnostics.py index d2ba26cb835..6e426ab5dfb 100644 --- a/hermes_cli/kanban_diagnostics.py +++ b/hermes_cli/kanban_diagnostics.py @@ -570,6 +570,130 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]: )] +def _rule_stranded_in_ready(task, events, runs, now, cfg) -> list[Diagnostic]: + """Task has been in ``ready`` status for too long without any worker + claiming it. + + Threshold: cfg["stranded_threshold_seconds"] (default 1800 = 30 min). + + Catches every "task waiting for a worker that never comes" case + without caring WHY: + + * Operator typo'd the assignee — no profile or external worker matches. + * Profile was deleted, leaving its tasks stranded. + * External worker pool (Codex CLI, Claude Code lane, custom daemon) + is down, hung, or wasn't started. + * Dispatcher is misconfigured (wrong board, wrong HERMES_HOME). + + Pre-rule, all of these silently rotted in ``skipped_nonspawnable`` — + the dispatcher correctly skipped them (good — no respawn loop) but + nobody surfaced the fact that operator-actionable work was + accumulating. The rule fires when a ready task's promoted-to-ready + timestamp is older than the threshold AND the assignee is non-empty + (truly unassigned tasks have their own ``skipped_unassigned`` signal + on the dispatcher and a different operator response). + + The signal is age-based on purpose: it's identity-agnostic, so it + works for Hermes profiles, registered lanes, external workers, and + typos uniformly. No registry to curate, no per-board allowlist. + """ + threshold_seconds = float( + cfg.get("stranded_threshold_seconds", 30 * 60) + ) + status = _task_field(task, "status") + if status != "ready": + return [] + # Skip tasks with a live claim — they're being worked on, even if + # the worker hasn't reported progress yet (run-level liveness + # extends the claim TTL; we don't want to second-guess that here). + if _task_field(task, "claim_lock"): + return [] + assignee = _task_field(task, "assignee") or "" + if not assignee.strip(): + # Unassigned tasks: the dispatcher's ``skipped_unassigned`` is + # already the right signal. A separate diagnostic here would + # double-flag the same condition. + return [] + + # Find the most recent event that put this task into ready. + # ``created`` covers tasks born ready; ``promoted`` covers parent- + # done auto-promotion; ``reclaimed`` covers TTL/crash recovery; + # ``unblocked`` covers human-driven resumes. + READY_TRANSITION_KINDS = { + "created", "promoted", "reclaimed", "unblocked", + } + last_ready_ts = 0 + for ev in events: + if _event_kind(ev) in READY_TRANSITION_KINDS: + t = _event_ts(ev) + if t > last_ready_ts: + last_ready_ts = t + + # Fallback: if no qualifying event exists (very old task or events + # truncated), fall back to ``created_at`` on the task row. Better + # to occasionally over-flag an ancient task than miss a stranded one. + if last_ready_ts == 0: + last_ready_ts = int(_task_field(task, "created_at", default=0) or 0) + if last_ready_ts == 0: + return [] + + age_seconds = now - last_ready_ts + if age_seconds < threshold_seconds: + return [] + + # Format the age in the largest sensible unit. + if age_seconds >= 3600: + age_str = f"{age_seconds / 3600:.1f}h" + else: + age_str = f"{int(age_seconds / 60)}m" + + # Severity escalates with age. Below 2x threshold = warning; + # 2x – 6x = error; beyond 6x = critical (something is clearly + # broken, not just slow). + if age_seconds >= threshold_seconds * 6: + severity = "critical" + elif age_seconds >= threshold_seconds * 2: + severity = "error" + else: + severity = "warning" + + actions = [ + DiagnosticAction( + kind="reassign", + label="Reassign to a different worker", + payload={"current_assignee": assignee}, + ), + DiagnosticAction( + kind="cli_hint", + label="Check dispatcher status", + payload={"command": "hermes kanban diagnostics"}, + ), + ] + + return [Diagnostic( + kind="stranded_in_ready", + severity=severity, + title=f"Ready for {age_str} with no worker", + detail=( + f"This task has been ready for {age_str} but nothing has " + f"claimed it. Common causes: assignee {assignee!r} is " + f"misspelled, the profile was deleted, or the external " + f"worker pool for this lane is down. Confirm the assignee " + f"is correct and that a worker is actually polling for it." + ), + actions=actions, + first_seen_at=last_ready_ts, + last_seen_at=last_ready_ts, + count=1, + data={ + "ready_since": last_ready_ts, + "age_seconds": int(age_seconds), + "assignee": assignee, + "threshold_seconds": int(threshold_seconds), + }, + )] + + # Registry — order matters: rules higher on the list render first when # severity ties. Add new rules here. _RULES: list[RuleFn] = [ @@ -578,6 +702,7 @@ _RULES: list[RuleFn] = [ _rule_repeated_failures, _rule_repeated_crashes, _rule_stuck_in_blocked, + _rule_stranded_in_ready, ] @@ -589,6 +714,7 @@ DIAGNOSTIC_KINDS = ( "repeated_failures", "repeated_crashes", "stuck_in_blocked", + "stranded_in_ready", ) @@ -598,6 +724,10 @@ DEFAULT_CONFIG = { "spawn_failure_threshold": 3, "crash_threshold": 2, "blocked_stale_hours": 24, + # Stranded-task threshold. 30 min by default — below that, the + # signal is dominated by tasks that are about to be claimed on the + # next dispatcher tick (default 60s) and would just be noise. + "stranded_threshold_seconds": 30 * 60, } diff --git a/tests/hermes_cli/test_kanban_diagnostics.py b/tests/hermes_cli/test_kanban_diagnostics.py index d39695ca94d..ad00e4136a8 100644 --- a/tests/hermes_cli/test_kanban_diagnostics.py +++ b/tests/hermes_cli/test_kanban_diagnostics.py @@ -75,10 +75,13 @@ def test_hallucinated_cards_fires_on_blocked_event(): phantom_cards=["t_bad1", "t_bad2"], verified_cards=["t_good1"]), ] - diags = kd.compute_task_diagnostics(task, events, []) - assert len(diags) == 1 - d = diags[0] - assert d.kind == "hallucinated_cards" + # ``now=300`` keeps the synthetic event timestamps in scope without + # tripping the stranded_in_ready rule (events are 100/200 epoch + # which time.time() would treat as ~50yr old). + diags = kd.compute_task_diagnostics(task, events, [], now=300) + halluc = [d for d in diags if d.kind == "hallucinated_cards"] + assert len(halluc) == 1 + d = halluc[0] assert d.severity == "error" assert d.data["phantom_ids"] == ["t_bad1", "t_bad2"] # Generic recovery actions always available; comment action too. @@ -379,3 +382,176 @@ def test_broken_rule_is_isolated(monkeypatch): # The broken rule silently drops, the real one still fires. kinds = [d.kind for d in diags] assert "repeated_failures" in kinds + + +# --------------------------------------------------------------------------- +# stranded_in_ready +# +# Surfaces ready tasks that nobody has claimed within the threshold. +# Identity-agnostic by design: catches typo'd assignees, deleted profiles, +# down external worker pools, and misconfigured dispatchers in one rule. +# --------------------------------------------------------------------------- + + +def test_stranded_in_ready_fires_when_age_exceeds_threshold(): + """Default threshold = 30 min. A ready task promoted 45 min ago + with no claim should fire as a warning.""" + now = 100_000 + task = _task(status="ready", assignee="demo", claim_lock=None) + # 45 min = 2700s, threshold = 1800s. + events = [_event("created", ts=now - 45 * 60)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + stranded = [d for d in diags if d.kind == "stranded_in_ready"] + assert len(stranded) == 1 + assert stranded[0].severity == "warning" + assert stranded[0].data["age_seconds"] == 45 * 60 + assert stranded[0].data["assignee"] == "demo" + + +def test_stranded_in_ready_silent_below_threshold(): + """A ready task only 10 min old should NOT fire.""" + now = 100_000 + task = _task(status="ready", assignee="demo", claim_lock=None) + events = [_event("created", ts=now - 10 * 60)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [] + + +def test_stranded_in_ready_skips_non_ready_status(): + """Tasks not in ready status are out of scope (running tasks have + their own crash / failure rules).""" + now = 100_000 + for status in ("running", "blocked", "done", "todo", "triage"): + task = _task(status=status, assignee="demo") + events = [_event("created", ts=now - 6 * 3600)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [], status + + +def test_stranded_in_ready_skips_unassigned_tasks(): + """Empty assignee = `skipped_unassigned` on the dispatcher already. + Don't double-flag here.""" + now = 100_000 + task = _task(status="ready", assignee="", claim_lock=None) + events = [_event("created", ts=now - 6 * 3600)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [] + + +def test_stranded_in_ready_skips_claimed_tasks(): + """A live claim_lock means a worker is on it — even an old one. Don't + second-guess: the run-level liveness signal owns that decision.""" + now = 100_000 + task = _task( + status="ready", assignee="demo", claim_lock="run_xyz", + ) + events = [_event("created", ts=now - 6 * 3600)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [] + + +def test_stranded_in_ready_uses_latest_ready_transition(): + """When multiple ready-transition events exist, the rule should + age-from the most recent — a task reclaimed 20 min ago is NOT + stranded for 6h even if it was first created 6h ago.""" + now = 100_000 + task = _task(status="ready", assignee="demo") + events = [ + _event("created", ts=now - 6 * 3600), # 6 h ago + _event("reclaimed", ts=now - 20 * 60), # 20 min ago — wins + ] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [] + + +def test_stranded_in_ready_severity_escalates_with_age(): + """warning → error → critical at 2x and 6x threshold.""" + now = 100_000 + task = _task(status="ready", assignee="demo") + # Default threshold = 1800s. + cases = [ + (45 * 60, "warning"), # 1.5x → warning + (90 * 60, "error"), # 3x → error + (4 * 3600, "critical"), # 8x → critical + ] + for age, expected in cases: + events = [_event("created", ts=now - age)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + stranded = [d for d in diags if d.kind == "stranded_in_ready"] + assert len(stranded) == 1, f"age={age}" + assert stranded[0].severity == expected, ( + f"age={age} expected {expected}, got {stranded[0].severity}" + ) + + +def test_stranded_in_ready_respects_config_override(): + """Config override changes the threshold.""" + now = 100_000 + task = _task(status="ready", assignee="demo") + events = [_event("created", ts=now - 10 * 60)] # 10 min + # Default 30 min — wouldn't fire. + diags = kd.compute_task_diagnostics(task, events, [], now=now) + assert [d for d in diags if d.kind == "stranded_in_ready"] == [] + # Lower the threshold to 5 min — now it fires. + diags = kd.compute_task_diagnostics( + task, events, [], now=now, + config={"stranded_threshold_seconds": 5 * 60}, + ) + stranded = [d for d in diags if d.kind == "stranded_in_ready"] + assert len(stranded) == 1 + + +def test_stranded_in_ready_falls_back_to_created_at(): + """When events have no ready-transition kind, the rule falls back + to the task's ``created_at`` so an ancient stranded task isn't + invisible just because its events got pruned.""" + now = 100_000 + task = _task( + status="ready", assignee="demo", created_at=now - 4 * 3600, + ) + # No qualifying events. + events = [_event("commented", ts=now - 100)] + diags = kd.compute_task_diagnostics(task, events, [], now=now) + stranded = [d for d in diags if d.kind == "stranded_in_ready"] + assert len(stranded) == 1 + assert stranded[0].data["age_seconds"] == 4 * 3600 + + +def test_stranded_in_ready_works_on_real_db_row(kanban_home): + """Round-trip through real kanban_db.connect() — confirms the rule + works on sqlite3.Row objects, not just dicts.""" + import time as _t + conn = kb.connect() + try: + # Create a task and force its created_at into the past. + tid = kb.create_task(conn, title="stranded one", assignee="ghost") + old_ts = int(_t.time()) - 90 * 60 # 90 min old + conn.execute( + "UPDATE tasks SET status = 'ready', created_at = ? WHERE id = ?", + (old_ts, tid), + ) + conn.commit() + + task_row = conn.execute( + "SELECT * FROM tasks WHERE id = ?", (tid,) + ).fetchone() + events = list(conn.execute( + "SELECT * FROM task_events WHERE task_id = ? ORDER BY created_at", + (tid,), + ).fetchall()) + # Override created event timestamps too so age calc lines up. + conn.execute( + "UPDATE task_events SET created_at = ? WHERE task_id = ?", + (old_ts, tid), + ) + conn.commit() + events = list(conn.execute( + "SELECT * FROM task_events WHERE task_id = ?", (tid,), + ).fetchall()) + + diags = kd.compute_task_diagnostics(task_row, events, []) + stranded = [d for d in diags if d.kind == "stranded_in_ready"] + assert len(stranded) == 1 + assert stranded[0].data["assignee"] == "ghost" + finally: + conn.close() diff --git a/website/docs/user-guide/features/kanban-worker-lanes.md b/website/docs/user-guide/features/kanban-worker-lanes.md index 630606eda24..675169f9892 100644 --- a/website/docs/user-guide/features/kanban-worker-lanes.md +++ b/website/docs/user-guide/features/kanban-worker-lanes.md @@ -104,6 +104,7 @@ So lane authors don't have to reimplement these: - **Crashed worker** — a worker whose host-local PID has vanished is detected by `detect_crashed_workers` and reaped; the task increments `consecutive_failures` and may auto-block when the breaker trips. - **Run-level retry** — when a task is retried (post-block, post-crash, post-reclaim), the worker can use the `expected_run_id` parameter on terminating tools to fail fast if its own run was already superseded. - **Per-task max runtime** — `task.max_runtime_seconds` hard-caps wall-clock time per run, regardless of PID liveness. Catches genuinely-deadlocked workers that the live-PID extension would otherwise keep running. +- **Stranded-task detection** — a ready task whose assignee never produces a claim within `kanban.stranded_threshold_seconds` (default 30 min) shows up in `hermes kanban diagnostics` as a `stranded_in_ready` warning. Severity escalates to error at 2x the threshold and critical at 6x. Catches typo'd assignees, deleted profiles, and down external worker pools in one signal — identity-agnostic, no per-board allowlist to curate. ## Related