mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
feat(kanban): stranded_in_ready diagnostic for unclaimed tasks (#23578)
Surface ready tasks that nobody claims within a threshold (default 30 min) regardless of why. One identity-agnostic signal that catches: - Operator typo'd the assignee - Profile was deleted, leaving its tasks stranded - External worker pool (Codex CLI lane, custom daemon) is down - Dispatcher misconfigured (wrong board / wrong HERMES_HOME) Today the dispatcher correctly skips these (no respawn loop, good) but nothing surfaces the fact that operator-actionable work is accumulating. The new `stranded_in_ready` rule does that without requiring a manual lane registry — it reads the most recent ready- transition event (`created` / `promoted` / `reclaimed` / `unblocked`) and fires when (now - last_ready_ts) > threshold. Severity escalates with age: warning at threshold, error at 2x, critical at 6x. The cli_hint and reassign actions point operators at the right next step. Out of scope deliberately: - Lane registry (#20157 closed) — this signal supersedes it. - Pushing the diagnostic into messaging gateways — diagnostics are pull-only via 'hermes kanban diagnostics' for now; gateway push is a separate UX decision. Tests: 10 new + 461 existing kanban tests pass. E2E verified end- to-end via 'hermes kanban diagnostics --json' against a 2h-old stranded task — surfaces as error severity with correct actions.
This commit is contained in:
parent
bf5b8a7d61
commit
3b122cc1ac
3 changed files with 311 additions and 4 deletions
|
|
@ -570,6 +570,130 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|||
)]
|
||||
|
||||
|
||||
def _rule_stranded_in_ready(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
"""Task has been in ``ready`` status for too long without any worker
|
||||
claiming it.
|
||||
|
||||
Threshold: cfg["stranded_threshold_seconds"] (default 1800 = 30 min).
|
||||
|
||||
Catches every "task waiting for a worker that never comes" case
|
||||
without caring WHY:
|
||||
|
||||
* Operator typo'd the assignee — no profile or external worker matches.
|
||||
* Profile was deleted, leaving its tasks stranded.
|
||||
* External worker pool (Codex CLI, Claude Code lane, custom daemon)
|
||||
is down, hung, or wasn't started.
|
||||
* Dispatcher is misconfigured (wrong board, wrong HERMES_HOME).
|
||||
|
||||
Pre-rule, all of these silently rotted in ``skipped_nonspawnable`` —
|
||||
the dispatcher correctly skipped them (good — no respawn loop) but
|
||||
nobody surfaced the fact that operator-actionable work was
|
||||
accumulating. The rule fires when a ready task's promoted-to-ready
|
||||
timestamp is older than the threshold AND the assignee is non-empty
|
||||
(truly unassigned tasks have their own ``skipped_unassigned`` signal
|
||||
on the dispatcher and a different operator response).
|
||||
|
||||
The signal is age-based on purpose: it's identity-agnostic, so it
|
||||
works for Hermes profiles, registered lanes, external workers, and
|
||||
typos uniformly. No registry to curate, no per-board allowlist.
|
||||
"""
|
||||
threshold_seconds = float(
|
||||
cfg.get("stranded_threshold_seconds", 30 * 60)
|
||||
)
|
||||
status = _task_field(task, "status")
|
||||
if status != "ready":
|
||||
return []
|
||||
# Skip tasks with a live claim — they're being worked on, even if
|
||||
# the worker hasn't reported progress yet (run-level liveness
|
||||
# extends the claim TTL; we don't want to second-guess that here).
|
||||
if _task_field(task, "claim_lock"):
|
||||
return []
|
||||
assignee = _task_field(task, "assignee") or ""
|
||||
if not assignee.strip():
|
||||
# Unassigned tasks: the dispatcher's ``skipped_unassigned`` is
|
||||
# already the right signal. A separate diagnostic here would
|
||||
# double-flag the same condition.
|
||||
return []
|
||||
|
||||
# Find the most recent event that put this task into ready.
|
||||
# ``created`` covers tasks born ready; ``promoted`` covers parent-
|
||||
# done auto-promotion; ``reclaimed`` covers TTL/crash recovery;
|
||||
# ``unblocked`` covers human-driven resumes.
|
||||
READY_TRANSITION_KINDS = {
|
||||
"created", "promoted", "reclaimed", "unblocked",
|
||||
}
|
||||
last_ready_ts = 0
|
||||
for ev in events:
|
||||
if _event_kind(ev) in READY_TRANSITION_KINDS:
|
||||
t = _event_ts(ev)
|
||||
if t > last_ready_ts:
|
||||
last_ready_ts = t
|
||||
|
||||
# Fallback: if no qualifying event exists (very old task or events
|
||||
# truncated), fall back to ``created_at`` on the task row. Better
|
||||
# to occasionally over-flag an ancient task than miss a stranded one.
|
||||
if last_ready_ts == 0:
|
||||
last_ready_ts = int(_task_field(task, "created_at", default=0) or 0)
|
||||
if last_ready_ts == 0:
|
||||
return []
|
||||
|
||||
age_seconds = now - last_ready_ts
|
||||
if age_seconds < threshold_seconds:
|
||||
return []
|
||||
|
||||
# Format the age in the largest sensible unit.
|
||||
if age_seconds >= 3600:
|
||||
age_str = f"{age_seconds / 3600:.1f}h"
|
||||
else:
|
||||
age_str = f"{int(age_seconds / 60)}m"
|
||||
|
||||
# Severity escalates with age. Below 2x threshold = warning;
|
||||
# 2x – 6x = error; beyond 6x = critical (something is clearly
|
||||
# broken, not just slow).
|
||||
if age_seconds >= threshold_seconds * 6:
|
||||
severity = "critical"
|
||||
elif age_seconds >= threshold_seconds * 2:
|
||||
severity = "error"
|
||||
else:
|
||||
severity = "warning"
|
||||
|
||||
actions = [
|
||||
DiagnosticAction(
|
||||
kind="reassign",
|
||||
label="Reassign to a different worker",
|
||||
payload={"current_assignee": assignee},
|
||||
),
|
||||
DiagnosticAction(
|
||||
kind="cli_hint",
|
||||
label="Check dispatcher status",
|
||||
payload={"command": "hermes kanban diagnostics"},
|
||||
),
|
||||
]
|
||||
|
||||
return [Diagnostic(
|
||||
kind="stranded_in_ready",
|
||||
severity=severity,
|
||||
title=f"Ready for {age_str} with no worker",
|
||||
detail=(
|
||||
f"This task has been ready for {age_str} but nothing has "
|
||||
f"claimed it. Common causes: assignee {assignee!r} is "
|
||||
f"misspelled, the profile was deleted, or the external "
|
||||
f"worker pool for this lane is down. Confirm the assignee "
|
||||
f"is correct and that a worker is actually polling for it."
|
||||
),
|
||||
actions=actions,
|
||||
first_seen_at=last_ready_ts,
|
||||
last_seen_at=last_ready_ts,
|
||||
count=1,
|
||||
data={
|
||||
"ready_since": last_ready_ts,
|
||||
"age_seconds": int(age_seconds),
|
||||
"assignee": assignee,
|
||||
"threshold_seconds": int(threshold_seconds),
|
||||
},
|
||||
)]
|
||||
|
||||
|
||||
# Registry — order matters: rules higher on the list render first when
|
||||
# severity ties. Add new rules here.
|
||||
_RULES: list[RuleFn] = [
|
||||
|
|
@ -578,6 +702,7 @@ _RULES: list[RuleFn] = [
|
|||
_rule_repeated_failures,
|
||||
_rule_repeated_crashes,
|
||||
_rule_stuck_in_blocked,
|
||||
_rule_stranded_in_ready,
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -589,6 +714,7 @@ DIAGNOSTIC_KINDS = (
|
|||
"repeated_failures",
|
||||
"repeated_crashes",
|
||||
"stuck_in_blocked",
|
||||
"stranded_in_ready",
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -598,6 +724,10 @@ DEFAULT_CONFIG = {
|
|||
"spawn_failure_threshold": 3,
|
||||
"crash_threshold": 2,
|
||||
"blocked_stale_hours": 24,
|
||||
# Stranded-task threshold. 30 min by default — below that, the
|
||||
# signal is dominated by tasks that are about to be claimed on the
|
||||
# next dispatcher tick (default 60s) and would just be noise.
|
||||
"stranded_threshold_seconds": 30 * 60,
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -75,10 +75,13 @@ def test_hallucinated_cards_fires_on_blocked_event():
|
|||
phantom_cards=["t_bad1", "t_bad2"],
|
||||
verified_cards=["t_good1"]),
|
||||
]
|
||||
diags = kd.compute_task_diagnostics(task, events, [])
|
||||
assert len(diags) == 1
|
||||
d = diags[0]
|
||||
assert d.kind == "hallucinated_cards"
|
||||
# ``now=300`` keeps the synthetic event timestamps in scope without
|
||||
# tripping the stranded_in_ready rule (events are 100/200 epoch
|
||||
# which time.time() would treat as ~50yr old).
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=300)
|
||||
halluc = [d for d in diags if d.kind == "hallucinated_cards"]
|
||||
assert len(halluc) == 1
|
||||
d = halluc[0]
|
||||
assert d.severity == "error"
|
||||
assert d.data["phantom_ids"] == ["t_bad1", "t_bad2"]
|
||||
# Generic recovery actions always available; comment action too.
|
||||
|
|
@ -379,3 +382,176 @@ def test_broken_rule_is_isolated(monkeypatch):
|
|||
# The broken rule silently drops, the real one still fires.
|
||||
kinds = [d.kind for d in diags]
|
||||
assert "repeated_failures" in kinds
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# stranded_in_ready
|
||||
#
|
||||
# Surfaces ready tasks that nobody has claimed within the threshold.
|
||||
# Identity-agnostic by design: catches typo'd assignees, deleted profiles,
|
||||
# down external worker pools, and misconfigured dispatchers in one rule.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_stranded_in_ready_fires_when_age_exceeds_threshold():
|
||||
"""Default threshold = 30 min. A ready task promoted 45 min ago
|
||||
with no claim should fire as a warning."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="demo", claim_lock=None)
|
||||
# 45 min = 2700s, threshold = 1800s.
|
||||
events = [_event("created", ts=now - 45 * 60)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
stranded = [d for d in diags if d.kind == "stranded_in_ready"]
|
||||
assert len(stranded) == 1
|
||||
assert stranded[0].severity == "warning"
|
||||
assert stranded[0].data["age_seconds"] == 45 * 60
|
||||
assert stranded[0].data["assignee"] == "demo"
|
||||
|
||||
|
||||
def test_stranded_in_ready_silent_below_threshold():
|
||||
"""A ready task only 10 min old should NOT fire."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="demo", claim_lock=None)
|
||||
events = [_event("created", ts=now - 10 * 60)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == []
|
||||
|
||||
|
||||
def test_stranded_in_ready_skips_non_ready_status():
|
||||
"""Tasks not in ready status are out of scope (running tasks have
|
||||
their own crash / failure rules)."""
|
||||
now = 100_000
|
||||
for status in ("running", "blocked", "done", "todo", "triage"):
|
||||
task = _task(status=status, assignee="demo")
|
||||
events = [_event("created", ts=now - 6 * 3600)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == [], status
|
||||
|
||||
|
||||
def test_stranded_in_ready_skips_unassigned_tasks():
|
||||
"""Empty assignee = `skipped_unassigned` on the dispatcher already.
|
||||
Don't double-flag here."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="", claim_lock=None)
|
||||
events = [_event("created", ts=now - 6 * 3600)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == []
|
||||
|
||||
|
||||
def test_stranded_in_ready_skips_claimed_tasks():
|
||||
"""A live claim_lock means a worker is on it — even an old one. Don't
|
||||
second-guess: the run-level liveness signal owns that decision."""
|
||||
now = 100_000
|
||||
task = _task(
|
||||
status="ready", assignee="demo", claim_lock="run_xyz",
|
||||
)
|
||||
events = [_event("created", ts=now - 6 * 3600)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == []
|
||||
|
||||
|
||||
def test_stranded_in_ready_uses_latest_ready_transition():
|
||||
"""When multiple ready-transition events exist, the rule should
|
||||
age-from the most recent — a task reclaimed 20 min ago is NOT
|
||||
stranded for 6h even if it was first created 6h ago."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="demo")
|
||||
events = [
|
||||
_event("created", ts=now - 6 * 3600), # 6 h ago
|
||||
_event("reclaimed", ts=now - 20 * 60), # 20 min ago — wins
|
||||
]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == []
|
||||
|
||||
|
||||
def test_stranded_in_ready_severity_escalates_with_age():
|
||||
"""warning → error → critical at 2x and 6x threshold."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="demo")
|
||||
# Default threshold = 1800s.
|
||||
cases = [
|
||||
(45 * 60, "warning"), # 1.5x → warning
|
||||
(90 * 60, "error"), # 3x → error
|
||||
(4 * 3600, "critical"), # 8x → critical
|
||||
]
|
||||
for age, expected in cases:
|
||||
events = [_event("created", ts=now - age)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
stranded = [d for d in diags if d.kind == "stranded_in_ready"]
|
||||
assert len(stranded) == 1, f"age={age}"
|
||||
assert stranded[0].severity == expected, (
|
||||
f"age={age} expected {expected}, got {stranded[0].severity}"
|
||||
)
|
||||
|
||||
|
||||
def test_stranded_in_ready_respects_config_override():
|
||||
"""Config override changes the threshold."""
|
||||
now = 100_000
|
||||
task = _task(status="ready", assignee="demo")
|
||||
events = [_event("created", ts=now - 10 * 60)] # 10 min
|
||||
# Default 30 min — wouldn't fire.
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
assert [d for d in diags if d.kind == "stranded_in_ready"] == []
|
||||
# Lower the threshold to 5 min — now it fires.
|
||||
diags = kd.compute_task_diagnostics(
|
||||
task, events, [], now=now,
|
||||
config={"stranded_threshold_seconds": 5 * 60},
|
||||
)
|
||||
stranded = [d for d in diags if d.kind == "stranded_in_ready"]
|
||||
assert len(stranded) == 1
|
||||
|
||||
|
||||
def test_stranded_in_ready_falls_back_to_created_at():
|
||||
"""When events have no ready-transition kind, the rule falls back
|
||||
to the task's ``created_at`` so an ancient stranded task isn't
|
||||
invisible just because its events got pruned."""
|
||||
now = 100_000
|
||||
task = _task(
|
||||
status="ready", assignee="demo", created_at=now - 4 * 3600,
|
||||
)
|
||||
# No qualifying events.
|
||||
events = [_event("commented", ts=now - 100)]
|
||||
diags = kd.compute_task_diagnostics(task, events, [], now=now)
|
||||
stranded = [d for d in diags if d.kind == "stranded_in_ready"]
|
||||
assert len(stranded) == 1
|
||||
assert stranded[0].data["age_seconds"] == 4 * 3600
|
||||
|
||||
|
||||
def test_stranded_in_ready_works_on_real_db_row(kanban_home):
|
||||
"""Round-trip through real kanban_db.connect() — confirms the rule
|
||||
works on sqlite3.Row objects, not just dicts."""
|
||||
import time as _t
|
||||
conn = kb.connect()
|
||||
try:
|
||||
# Create a task and force its created_at into the past.
|
||||
tid = kb.create_task(conn, title="stranded one", assignee="ghost")
|
||||
old_ts = int(_t.time()) - 90 * 60 # 90 min old
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = 'ready', created_at = ? WHERE id = ?",
|
||||
(old_ts, tid),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
task_row = conn.execute(
|
||||
"SELECT * FROM tasks WHERE id = ?", (tid,)
|
||||
).fetchone()
|
||||
events = list(conn.execute(
|
||||
"SELECT * FROM task_events WHERE task_id = ? ORDER BY created_at",
|
||||
(tid,),
|
||||
).fetchall())
|
||||
# Override created event timestamps too so age calc lines up.
|
||||
conn.execute(
|
||||
"UPDATE task_events SET created_at = ? WHERE task_id = ?",
|
||||
(old_ts, tid),
|
||||
)
|
||||
conn.commit()
|
||||
events = list(conn.execute(
|
||||
"SELECT * FROM task_events WHERE task_id = ?", (tid,),
|
||||
).fetchall())
|
||||
|
||||
diags = kd.compute_task_diagnostics(task_row, events, [])
|
||||
stranded = [d for d in diags if d.kind == "stranded_in_ready"]
|
||||
assert len(stranded) == 1
|
||||
assert stranded[0].data["assignee"] == "ghost"
|
||||
finally:
|
||||
conn.close()
|
||||
|
|
|
|||
|
|
@ -104,6 +104,7 @@ So lane authors don't have to reimplement these:
|
|||
- **Crashed worker** — a worker whose host-local PID has vanished is detected by `detect_crashed_workers` and reaped; the task increments `consecutive_failures` and may auto-block when the breaker trips.
|
||||
- **Run-level retry** — when a task is retried (post-block, post-crash, post-reclaim), the worker can use the `expected_run_id` parameter on terminating tools to fail fast if its own run was already superseded.
|
||||
- **Per-task max runtime** — `task.max_runtime_seconds` hard-caps wall-clock time per run, regardless of PID liveness. Catches genuinely-deadlocked workers that the live-PID extension would otherwise keep running.
|
||||
- **Stranded-task detection** — a ready task whose assignee never produces a claim within `kanban.stranded_threshold_seconds` (default 30 min) shows up in `hermes kanban diagnostics` as a `stranded_in_ready` warning. Severity escalates to error at 2x the threshold and critical at 6x. Catches typo'd assignees, deleted profiles, and down external worker pools in one signal — identity-agnostic, no per-board allowlist to curate.
|
||||
|
||||
## Related
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue