feat(kanban): stale detection for running tasks in dispatcher

Salvages #23790 by @thewillhuang. Adds detect_stale_running() to
the dispatcher cycle. Running tasks that have been started for longer
than dispatch_stale_timeout_seconds (default 14400 = 4h) without a
heartbeat in the last hour are auto-reclaimed to ready.

- New config kanban.dispatch_stale_timeout_seconds (default 14400, 0 disables)
- New 'stale' field on DispatchResult
- detect_stale_running() in kanban_db.py with heartbeat freshness check
- Records outcome='stale' on run close + 'stale' event; ticks failure counter
- Wires config through gateway embedded dispatcher
- Updates _cmd_dispatch verbose/JSON output and daemon logging

Resolved test-file end-of-file conflict by appending both halves.
This commit is contained in:
thewillhuang 2026-05-18 21:20:50 -07:00 committed by Teknium
parent f55d94a1e0
commit e286e68756
6 changed files with 423 additions and 3 deletions

View file

@ -4820,6 +4820,18 @@ class GatewayRunner:
)
failure_limit = _kb.DEFAULT_FAILURE_LIMIT
# Read stale_timeout_seconds — 0 disables stale detection.
raw_stale = kanban_cfg.get("dispatch_stale_timeout_seconds", 0)
try:
stale_timeout_seconds = int(raw_stale or 0)
except (TypeError, ValueError):
logger.warning(
"kanban dispatcher: invalid kanban.dispatch_stale_timeout_seconds=%r; "
"disabling stale detection",
raw_stale,
)
stale_timeout_seconds = 0
# Initial delay so the gateway finishes wiring adapters before the
# dispatcher spawns workers (those workers may hit gateway notify
# subscriptions etc.). Matches the notifier watcher's delay.
@ -4888,6 +4900,7 @@ class GatewayRunner:
max_spawn=max_spawn,
max_in_progress=max_in_progress,
failure_limit=failure_limit,
stale_timeout_seconds=stale_timeout_seconds,
)
except sqlite3.DatabaseError as exc:
if _is_corrupt_board_db_error(exc):

View file

@ -1530,6 +1530,7 @@ DEFAULT_CONFIG = {
# same task/profile (spawn_failed, timed_out, or crashed). Reassignment
# resets the streak for the new profile.
"failure_limit": 2,
<<<<<<< HEAD
# Worker stdout/stderr logs rotate at spawn time. Defaults preserve
# the historical 2 MiB + one-backup behavior; long-running workers can
# raise these to keep more early failure evidence.
@ -1554,6 +1555,14 @@ DEFAULT_CONFIG = {
# large bulk-load of triage tasks from spending a burst of aux
# LLM calls in one tick. Excess tasks defer to the next tick.
"auto_decompose_per_tick": 3,
=======
# Stale detection: running tasks that have exceeded this many
# seconds without a heartbeat (since ``last_heartbeat_at``) are
# auto-reclaimed to ``ready`` on the next dispatcher tick. The
# worker process (if still running host-locally) is terminated
# before the reclaim. 0 disables stale detection entirely.
"dispatch_stale_timeout_seconds": 14400,
>>>>>>> 0b6d673e7 (feat(kanban): stale detection for running tasks in dispatcher)
},
# execute_code settings — controls the tool used for programmatic tool calls.

View file

@ -1894,6 +1894,7 @@ def _cmd_dispatch(args: argparse.Namespace) -> int:
"reclaimed": res.reclaimed,
"crashed": res.crashed,
"timed_out": res.timed_out,
"stale": res.stale,
"auto_blocked": res.auto_blocked,
"promoted": res.promoted,
"spawned": [
@ -1911,6 +1912,9 @@ def _cmd_dispatch(args: argparse.Namespace) -> int:
print(f"Timed out: {len(res.timed_out)}")
if res.timed_out:
print(f" {', '.join(res.timed_out)}")
print(f"Stale: {len(res.stale)}")
if res.stale:
print(f" {', '.join(res.stale)}")
print(f"Auto-blocked: {len(res.auto_blocked)}")
if res.auto_blocked:
print(f" {', '.join(res.auto_blocked)}")
@ -2025,13 +2029,13 @@ def _cmd_daemon(args: argparse.Namespace) -> int:
return
did_work = (
res.reclaimed or res.crashed or res.timed_out or res.promoted
or res.spawned or res.auto_blocked
or res.spawned or res.auto_blocked or res.stale
)
if did_work:
print(
f"[{_fmt_ts(int(time.time()))}] "
f"reclaimed={res.reclaimed} crashed={len(res.crashed)} "
f"timed_out={len(res.timed_out)} "
f"timed_out={len(res.timed_out)} stale={len(res.stale)} "
f"promoted={res.promoted} spawned={len(res.spawned)} "
f"auto_blocked={len(res.auto_blocked)}",
flush=True,

View file

@ -3472,6 +3472,9 @@ class DispatchResult:
"""Task ids auto-blocked by the spawn-failure circuit breaker."""
timed_out: list[str] = field(default_factory=list)
"""Task ids whose workers exceeded ``max_runtime_seconds``."""
stale: list[str] = field(default_factory=list)
"""Task ids reclaimed because no progress (heartbeat) was seen
within ``dispatch_stale_timeout_seconds``."""
# Bounded registry of recently-reaped worker child exits, populated by the
@ -3829,6 +3832,145 @@ def enforce_max_runtime(
return timed_out
# Heartbeat staleness heartbeat gap — if a running task hasn't sent a
# heartbeat in this many seconds it's considered inactive regardless of
# the ``dispatch_stale_timeout_seconds`` threshold. Hardcoded at 1 hour
# to match the original spec (">4h started + no commits in 1h").
_STALE_HEARTBEAT_GAP_SECONDS = 3600
def detect_stale_running(
conn: sqlite3.Connection,
*,
stale_timeout_seconds: int = 0,
signal_fn=None,
) -> list[str]:
"""Reclaim ``running`` tasks that show no progress (heartbeat) within the
staleness window.
A task is considered stale when BOTH of these hold:
1. It has been running for longer than ``stale_timeout_seconds``
(measured from the active run's ``started_at``, falling back to
``tasks.started_at`` on older runs).
2. Its ``last_heartbeat_at`` is older than
``_STALE_HEARTBEAT_GAP_SECONDS`` (or NULL never sent a heartbeat).
On reclaim the task is reset to ``ready``, the run is closed with
``outcome='stale'``, and the host-local worker (if still running) is
terminated.
Only considers ``status='running'`` tasks. Blocked tasks are never
candidates. Returns the list of reclaimed task IDs.
``stale_timeout_seconds=0`` disables the check entirely (returns ``[]``
immediately). ``signal_fn`` is a test hook; defaults to ``os.kill``
on POSIX.
"""
if stale_timeout_seconds <= 0:
return []
import signal as _signal_mod
now = int(time.time())
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
reclaimed: list[str] = []
rows = conn.execute(
"SELECT t.id, t.worker_pid, t.last_heartbeat_at, t.claim_lock, "
" COALESCE(r.started_at, t.started_at) AS active_started_at "
"FROM tasks t "
"LEFT JOIN task_runs r ON r.id = t.current_run_id "
"WHERE t.status = 'running'"
).fetchall()
for row in rows:
# Skip if no started_at (shouldn't happen for running, but be safe).
if row["active_started_at"] is None:
continue
elapsed = now - int(row["active_started_at"])
if elapsed < stale_timeout_seconds:
continue # not old enough to check
last_hb = row["last_heartbeat_at"]
hb_age = (now - int(last_hb)) if last_hb is not None else None
if hb_age is not None and hb_age < _STALE_HEARTBEAT_GAP_SECONDS:
continue # recent heartbeat → still alive
pid = row["worker_pid"]
tid = row["id"]
lock = row["claim_lock"] or ""
# Terminate the worker if it's still host-local.
termination = _terminate_reclaimed_worker(
pid, lock, signal_fn=signal_fn,
)
with write_txn(conn):
cur = conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL, "
"last_heartbeat_at = NULL "
"WHERE id = ? AND status = 'running'",
(tid,),
)
if cur.rowcount != 1:
continue
payload = {
"elapsed_seconds": int(elapsed),
"last_heartbeat_at": (
int(last_hb) if last_hb is not None else None
),
"heartbeat_age_seconds": (
int(hb_age) if hb_age is not None else None
),
"timeout_seconds": stale_timeout_seconds,
"pid": int(pid) if pid else None,
}
payload.update(termination)
run_id = _end_run(
conn, tid,
outcome="stale", status="stale",
error=(
f"no heartbeat for {int(hb_age)}s "
if hb_age is not None
else "no heartbeat ever"
) + f" after {int(elapsed)}s running",
metadata=payload,
)
_append_event(
conn, tid, "stale", payload, run_id=run_id,
)
reclaimed.append(tid)
# Increment failure counter. The task is already ``ready`` and the
# run is already closed; this just ticks the counter and may trip
# the circuit breaker.
_record_task_failure(
conn, tid,
error=(
f"no heartbeat for {int(hb_age)}s "
if hb_age is not None
else "no heartbeat ever"
) + f" after {int(elapsed)}s running",
outcome="stale",
release_claim=False,
end_run=False,
event_payload_extra={
"elapsed_seconds": int(elapsed),
"heartbeat_age_seconds": (
int(hb_age) if hb_age is not None else None
),
"timeout_seconds": stale_timeout_seconds,
},
)
return reclaimed
def set_max_runtime(
conn: sqlite3.Connection,
task_id: str,
@ -4274,13 +4416,15 @@ def dispatch_once(
max_spawn: Optional[int] = None,
max_in_progress: Optional[int] = None,
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
stale_timeout_seconds: int = 0,
board: Optional[str] = None,
) -> DispatchResult:
"""Run one dispatcher tick.
Steps:
1. Reclaim stale running tasks (TTL expired).
2. Reclaim crashed running tasks (host-local PID no longer alive).
2. Reclaim stale running tasks (no recent heartbeat).
3. Reclaim crashed running tasks (host-local PID no longer alive).
3. Promote todo -> ready where all parents are done.
4. For each ready task with an assignee, atomically claim and call
``spawn_fn(task, workspace_path, board) -> Optional[int]``. The
@ -4338,6 +4482,9 @@ def dispatch_once(
result = DispatchResult()
result.reclaimed = release_stale_claims(conn)
result.stale = detect_stale_running(
conn, stale_timeout_seconds=stale_timeout_seconds,
)
result.crashed = detect_crashed_workers(conn)
# detect_crashed_workers stashes protocol-violation auto-blocks on
# itself so the public list-return stays stable. Pull them into the

View file

@ -4344,3 +4344,66 @@ def test_reclaim_task_clears_failure_counter(kanban_home):
assert task.status == "ready"
finally:
conn.close()
def test_dispatch_once_integrates_stale_detection(kanban_home, monkeypatch):
"""dispatch_once with stale_timeout_seconds reclaims stale running tasks."""
import hermes_cli.kanban_db as _kb
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
with kb.connect() as conn:
t = kb.create_task(conn, title="stale-dispatch", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, 99999) # fake PID — avoid killing test
five_hours_ago = int(time.time()) - (5 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
res = kb.dispatch_once(
conn,
spawn_fn=lambda tsk, ws: None,
stale_timeout_seconds=14400,
)
assert t in res.stale, "Stale task should appear in result.stale"
assert kb.get_task(conn, t).status == "ready"
def test_dispatch_once_stale_disabled_when_timeout_zero(kanban_home, monkeypatch):
"""dispatch_once with stale_timeout_seconds=0 skips stale detection."""
# Use os.getpid() so _pid_alive → True, preventing detect_crashed_workers
# from reclaiming. Only stale detection (disabled via timeout=0) is tested.
with kb.connect() as conn:
t = kb.create_task(conn, title="skip-stale", assignee="worker")
kb.claim_task(conn, t)
# Claim sets worker_pid to 0 initially. Set it to os.getpid() so the
# crash detector sees a live PID and skips it.
kb._set_worker_pid(conn, t, os.getpid())
five_hours_ago = int(time.time()) - (5 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
res = kb.dispatch_once(
conn,
spawn_fn=lambda tsk, ws: None,
stale_timeout_seconds=0,
)
assert res.stale == [], "stale_timeout_seconds=0 should disable detection"
assert kb.get_task(conn, t).status == "running"

View file

@ -2232,3 +2232,187 @@ def test_dispatch_review_does_not_claim_ready_tasks(
# claim_review_task should NOT claim a ready task
claimed = kb.claim_review_task(conn, t)
assert claimed is None
# Stale detection — detect_stale_running
# ---------------------------------------------------------------------------
def test_detect_stale_returns_running_task_with_no_heartbeat(kanban_home, monkeypatch):
"""A task running > timeout with zero heartbeats gets reclaimed as stale."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="stale-no-hb", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
# Rewind started_at so the task appears to have been running for 5 hours.
five_hours_ago = int(time.time()) - (5 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
# No heartbeat set — last_heartbeat_at stays NULL.
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
killed = []
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: killed.append(s),
)
assert t in stale, "Task with no heartbeat for >4h should be reclaimed"
task = kb.get_task(conn, t)
assert task.status == "ready"
def test_detect_stale_returns_task_with_stale_heartbeat(kanban_home, monkeypatch):
"""A task running > timeout with a heartbeat older than 1h gets reclaimed."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="stale-hb", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
five_hours_ago = int(time.time()) - (5 * 3600)
heartbeat_2h_ago = int(time.time()) - (2 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ?, last_heartbeat_at = ? "
"WHERE id = ?",
(five_hours_ago, heartbeat_2h_ago, t),
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
)
assert t in stale, (
"Task with heartbeat >1h old and started >4h ago should be stale"
)
assert kb.get_task(conn, t).status == "ready"
def test_detect_stale_skips_task_with_recent_heartbeat(kanban_home, monkeypatch):
"""A task running > timeout but with a recent heartbeat is NOT reclaimed."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="alive-hb", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
five_hours_ago = int(time.time()) - (5 * 3600)
heartbeat_now = int(time.time()) # heartbeat just happened
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ?, last_heartbeat_at = ? "
"WHERE id = ?",
(five_hours_ago, heartbeat_now, t),
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
)
assert stale == [], "Task with recent heartbeat should not be reclaimed"
assert kb.get_task(conn, t).status == "running"
def test_detect_stale_skips_recently_started_task(kanban_home, monkeypatch):
"""A task started < timeout ago is NOT reclaimed even with no heartbeat."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="fresh", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
# Started only 1 hour ago — well within the 4h threshold.
one_hour_ago = int(time.time()) - 3600
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (one_hour_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(one_hour_ago, t),
)
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
)
assert stale == [], "Task started <4h ago should not be reclaimed"
assert kb.get_task(conn, t).status == "running"
def test_detect_stale_skips_when_timeout_zero(kanban_home, monkeypatch):
"""stale_timeout_seconds=0 disables stale detection entirely."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="disabled", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
five_hours_ago = int(time.time()) - (5 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=0, signal_fn=lambda p, s: None,
)
assert stale == [], "timeout=0 should disable stale detection"
assert kb.get_task(conn, t).status == "running"
def test_detect_stale_skips_blocked_tasks(kanban_home, monkeypatch):
"""Blocked tasks are NOT reclaimed by stale detection."""
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="blocked-task", assignee="worker")
kb.claim_task(conn, t)
kb._set_worker_pid(conn, t, os.getpid())
five_hours_ago = int(time.time()) - (5 * 3600)
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t)
)
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(five_hours_ago, t),
)
# Block the task explicitly.
kb.block_task(conn, t, reason="human requested block")
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
stale = kb.detect_stale_running(
conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None,
)
assert stale == [], "Blocked task should not be reclaimed by stale detection"
assert kb.get_task(conn, t).status == "blocked"