From e286e6875678711ee38353852ec201ea727e5284 Mon Sep 17 00:00:00 2001 From: thewillhuang <7867215+thewillhuang@users.noreply.github.com> Date: Mon, 18 May 2026 21:20:50 -0700 Subject: [PATCH] feat(kanban): stale detection for running tasks in dispatcher Salvages #23790 by @thewillhuang. Adds detect_stale_running() to the dispatcher cycle. Running tasks that have been started for longer than dispatch_stale_timeout_seconds (default 14400 = 4h) without a heartbeat in the last hour are auto-reclaimed to ready. - New config kanban.dispatch_stale_timeout_seconds (default 14400, 0 disables) - New 'stale' field on DispatchResult - detect_stale_running() in kanban_db.py with heartbeat freshness check - Records outcome='stale' on run close + 'stale' event; ticks failure counter - Wires config through gateway embedded dispatcher - Updates _cmd_dispatch verbose/JSON output and daemon logging Resolved test-file end-of-file conflict by appending both halves. --- gateway/run.py | 13 ++ hermes_cli/config.py | 9 + hermes_cli/kanban.py | 8 +- hermes_cli/kanban_db.py | 149 +++++++++++++- .../test_kanban_core_functionality.py | 63 ++++++ tests/hermes_cli/test_kanban_db.py | 184 ++++++++++++++++++ 6 files changed, 423 insertions(+), 3 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 03859a787f6..f8d0b724636 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4820,6 +4820,18 @@ class GatewayRunner: ) failure_limit = _kb.DEFAULT_FAILURE_LIMIT + # Read stale_timeout_seconds — 0 disables stale detection. + raw_stale = kanban_cfg.get("dispatch_stale_timeout_seconds", 0) + try: + stale_timeout_seconds = int(raw_stale or 0) + except (TypeError, ValueError): + logger.warning( + "kanban dispatcher: invalid kanban.dispatch_stale_timeout_seconds=%r; " + "disabling stale detection", + raw_stale, + ) + stale_timeout_seconds = 0 + # Initial delay so the gateway finishes wiring adapters before the # dispatcher spawns workers (those workers may hit gateway notify # subscriptions etc.). Matches the notifier watcher's delay. @@ -4888,6 +4900,7 @@ class GatewayRunner: max_spawn=max_spawn, max_in_progress=max_in_progress, failure_limit=failure_limit, + stale_timeout_seconds=stale_timeout_seconds, ) except sqlite3.DatabaseError as exc: if _is_corrupt_board_db_error(exc): diff --git a/hermes_cli/config.py b/hermes_cli/config.py index ce3ddd54108..c48ec10db00 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1530,6 +1530,7 @@ DEFAULT_CONFIG = { # same task/profile (spawn_failed, timed_out, or crashed). Reassignment # resets the streak for the new profile. "failure_limit": 2, +<<<<<<< HEAD # Worker stdout/stderr logs rotate at spawn time. Defaults preserve # the historical 2 MiB + one-backup behavior; long-running workers can # raise these to keep more early failure evidence. @@ -1554,6 +1555,14 @@ DEFAULT_CONFIG = { # large bulk-load of triage tasks from spending a burst of aux # LLM calls in one tick. Excess tasks defer to the next tick. "auto_decompose_per_tick": 3, +======= + # Stale detection: running tasks that have exceeded this many + # seconds without a heartbeat (since ``last_heartbeat_at``) are + # auto-reclaimed to ``ready`` on the next dispatcher tick. The + # worker process (if still running host-locally) is terminated + # before the reclaim. 0 disables stale detection entirely. + "dispatch_stale_timeout_seconds": 14400, +>>>>>>> 0b6d673e7 (feat(kanban): stale detection for running tasks in dispatcher) }, # execute_code settings — controls the tool used for programmatic tool calls. diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 01bdd3d8574..c389fe02b00 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -1894,6 +1894,7 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: "reclaimed": res.reclaimed, "crashed": res.crashed, "timed_out": res.timed_out, + "stale": res.stale, "auto_blocked": res.auto_blocked, "promoted": res.promoted, "spawned": [ @@ -1911,6 +1912,9 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: print(f"Timed out: {len(res.timed_out)}") if res.timed_out: print(f" {', '.join(res.timed_out)}") + print(f"Stale: {len(res.stale)}") + if res.stale: + print(f" {', '.join(res.stale)}") print(f"Auto-blocked: {len(res.auto_blocked)}") if res.auto_blocked: print(f" {', '.join(res.auto_blocked)}") @@ -2025,13 +2029,13 @@ def _cmd_daemon(args: argparse.Namespace) -> int: return did_work = ( res.reclaimed or res.crashed or res.timed_out or res.promoted - or res.spawned or res.auto_blocked + or res.spawned or res.auto_blocked or res.stale ) if did_work: print( f"[{_fmt_ts(int(time.time()))}] " f"reclaimed={res.reclaimed} crashed={len(res.crashed)} " - f"timed_out={len(res.timed_out)} " + f"timed_out={len(res.timed_out)} stale={len(res.stale)} " f"promoted={res.promoted} spawned={len(res.spawned)} " f"auto_blocked={len(res.auto_blocked)}", flush=True, diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 00beaaccd56..00ecca049fd 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -3472,6 +3472,9 @@ class DispatchResult: """Task ids auto-blocked by the spawn-failure circuit breaker.""" timed_out: list[str] = field(default_factory=list) """Task ids whose workers exceeded ``max_runtime_seconds``.""" + stale: list[str] = field(default_factory=list) + """Task ids reclaimed because no progress (heartbeat) was seen + within ``dispatch_stale_timeout_seconds``.""" # Bounded registry of recently-reaped worker child exits, populated by the @@ -3829,6 +3832,145 @@ def enforce_max_runtime( return timed_out +# Heartbeat staleness heartbeat gap — if a running task hasn't sent a +# heartbeat in this many seconds it's considered inactive regardless of +# the ``dispatch_stale_timeout_seconds`` threshold. Hardcoded at 1 hour +# to match the original spec (">4h started + no commits in 1h"). +_STALE_HEARTBEAT_GAP_SECONDS = 3600 + + +def detect_stale_running( + conn: sqlite3.Connection, + *, + stale_timeout_seconds: int = 0, + signal_fn=None, +) -> list[str]: + """Reclaim ``running`` tasks that show no progress (heartbeat) within the + staleness window. + + A task is considered stale when BOTH of these hold: + + 1. It has been running for longer than ``stale_timeout_seconds`` + (measured from the active run's ``started_at``, falling back to + ``tasks.started_at`` on older runs). + 2. Its ``last_heartbeat_at`` is older than + ``_STALE_HEARTBEAT_GAP_SECONDS`` (or NULL — never sent a heartbeat). + + On reclaim the task is reset to ``ready``, the run is closed with + ``outcome='stale'``, and the host-local worker (if still running) is + terminated. + + Only considers ``status='running'`` tasks. Blocked tasks are never + candidates. Returns the list of reclaimed task IDs. + + ``stale_timeout_seconds=0`` disables the check entirely (returns ``[]`` + immediately). ``signal_fn`` is a test hook; defaults to ``os.kill`` + on POSIX. + """ + if stale_timeout_seconds <= 0: + return [] + + import signal as _signal_mod + + now = int(time.time()) + host_prefix = f"{_claimer_id().split(':', 1)[0]}:" + reclaimed: list[str] = [] + + rows = conn.execute( + "SELECT t.id, t.worker_pid, t.last_heartbeat_at, t.claim_lock, " + " COALESCE(r.started_at, t.started_at) AS active_started_at " + "FROM tasks t " + "LEFT JOIN task_runs r ON r.id = t.current_run_id " + "WHERE t.status = 'running'" + ).fetchall() + + for row in rows: + # Skip if no started_at (shouldn't happen for running, but be safe). + if row["active_started_at"] is None: + continue + + elapsed = now - int(row["active_started_at"]) + if elapsed < stale_timeout_seconds: + continue # not old enough to check + + last_hb = row["last_heartbeat_at"] + hb_age = (now - int(last_hb)) if last_hb is not None else None + if hb_age is not None and hb_age < _STALE_HEARTBEAT_GAP_SECONDS: + continue # recent heartbeat → still alive + + pid = row["worker_pid"] + tid = row["id"] + lock = row["claim_lock"] or "" + + # Terminate the worker if it's still host-local. + termination = _terminate_reclaimed_worker( + pid, lock, signal_fn=signal_fn, + ) + + with write_txn(conn): + cur = conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "last_heartbeat_at = NULL " + "WHERE id = ? AND status = 'running'", + (tid,), + ) + if cur.rowcount != 1: + continue + + payload = { + "elapsed_seconds": int(elapsed), + "last_heartbeat_at": ( + int(last_hb) if last_hb is not None else None + ), + "heartbeat_age_seconds": ( + int(hb_age) if hb_age is not None else None + ), + "timeout_seconds": stale_timeout_seconds, + "pid": int(pid) if pid else None, + } + payload.update(termination) + + run_id = _end_run( + conn, tid, + outcome="stale", status="stale", + error=( + f"no heartbeat for {int(hb_age)}s " + if hb_age is not None + else "no heartbeat ever" + ) + f" after {int(elapsed)}s running", + metadata=payload, + ) + _append_event( + conn, tid, "stale", payload, run_id=run_id, + ) + reclaimed.append(tid) + + # Increment failure counter. The task is already ``ready`` and the + # run is already closed; this just ticks the counter and may trip + # the circuit breaker. + _record_task_failure( + conn, tid, + error=( + f"no heartbeat for {int(hb_age)}s " + if hb_age is not None + else "no heartbeat ever" + ) + f" after {int(elapsed)}s running", + outcome="stale", + release_claim=False, + end_run=False, + event_payload_extra={ + "elapsed_seconds": int(elapsed), + "heartbeat_age_seconds": ( + int(hb_age) if hb_age is not None else None + ), + "timeout_seconds": stale_timeout_seconds, + }, + ) + + return reclaimed + + def set_max_runtime( conn: sqlite3.Connection, task_id: str, @@ -4274,13 +4416,15 @@ def dispatch_once( max_spawn: Optional[int] = None, max_in_progress: Optional[int] = None, failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, + stale_timeout_seconds: int = 0, board: Optional[str] = None, ) -> DispatchResult: """Run one dispatcher tick. Steps: 1. Reclaim stale running tasks (TTL expired). - 2. Reclaim crashed running tasks (host-local PID no longer alive). + 2. Reclaim stale running tasks (no recent heartbeat). + 3. Reclaim crashed running tasks (host-local PID no longer alive). 3. Promote todo -> ready where all parents are done. 4. For each ready task with an assignee, atomically claim and call ``spawn_fn(task, workspace_path, board) -> Optional[int]``. The @@ -4338,6 +4482,9 @@ def dispatch_once( result = DispatchResult() result.reclaimed = release_stale_claims(conn) + result.stale = detect_stale_running( + conn, stale_timeout_seconds=stale_timeout_seconds, + ) result.crashed = detect_crashed_workers(conn) # detect_crashed_workers stashes protocol-violation auto-blocks on # itself so the public list-return stays stable. Pull them into the diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 385d91b304a..d6bb12a57f0 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -4344,3 +4344,66 @@ def test_reclaim_task_clears_failure_counter(kanban_home): assert task.status == "ready" finally: conn.close() + + +def test_dispatch_once_integrates_stale_detection(kanban_home, monkeypatch): + """dispatch_once with stale_timeout_seconds reclaims stale running tasks.""" + import hermes_cli.kanban_db as _kb + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + + with kb.connect() as conn: + t = kb.create_task(conn, title="stale-dispatch", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, 99999) # fake PID — avoid killing test + + five_hours_ago = int(time.time()) - (5 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + + res = kb.dispatch_once( + conn, + spawn_fn=lambda tsk, ws: None, + stale_timeout_seconds=14400, + ) + assert t in res.stale, "Stale task should appear in result.stale" + assert kb.get_task(conn, t).status == "ready" + + +def test_dispatch_once_stale_disabled_when_timeout_zero(kanban_home, monkeypatch): + """dispatch_once with stale_timeout_seconds=0 skips stale detection.""" + # Use os.getpid() so _pid_alive → True, preventing detect_crashed_workers + # from reclaiming. Only stale detection (disabled via timeout=0) is tested. + + with kb.connect() as conn: + t = kb.create_task(conn, title="skip-stale", assignee="worker") + kb.claim_task(conn, t) + # Claim sets worker_pid to 0 initially. Set it to os.getpid() so the + # crash detector sees a live PID and skips it. + kb._set_worker_pid(conn, t, os.getpid()) + + five_hours_ago = int(time.time()) - (5 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + + res = kb.dispatch_once( + conn, + spawn_fn=lambda tsk, ws: None, + stale_timeout_seconds=0, + ) + assert res.stale == [], "stale_timeout_seconds=0 should disable detection" + assert kb.get_task(conn, t).status == "running" diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index b6d60173e9d..16159e606ce 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -2232,3 +2232,187 @@ def test_dispatch_review_does_not_claim_ready_tasks( # claim_review_task should NOT claim a ready task claimed = kb.claim_review_task(conn, t) assert claimed is None + +# Stale detection — detect_stale_running +# --------------------------------------------------------------------------- + +def test_detect_stale_returns_running_task_with_no_heartbeat(kanban_home, monkeypatch): + """A task running > timeout with zero heartbeats gets reclaimed as stale.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="stale-no-hb", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + # Rewind started_at so the task appears to have been running for 5 hours. + five_hours_ago = int(time.time()) - (5 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + # No heartbeat set — last_heartbeat_at stays NULL. + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + killed = [] + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: killed.append(s), + ) + assert t in stale, "Task with no heartbeat for >4h should be reclaimed" + task = kb.get_task(conn, t) + assert task.status == "ready" + + +def test_detect_stale_returns_task_with_stale_heartbeat(kanban_home, monkeypatch): + """A task running > timeout with a heartbeat older than 1h gets reclaimed.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="stale-hb", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + five_hours_ago = int(time.time()) - (5 * 3600) + heartbeat_2h_ago = int(time.time()) - (2 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ?, last_heartbeat_at = ? " + "WHERE id = ?", + (five_hours_ago, heartbeat_2h_ago, t), + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None, + ) + assert t in stale, ( + "Task with heartbeat >1h old and started >4h ago should be stale" + ) + assert kb.get_task(conn, t).status == "ready" + + +def test_detect_stale_skips_task_with_recent_heartbeat(kanban_home, monkeypatch): + """A task running > timeout but with a recent heartbeat is NOT reclaimed.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="alive-hb", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + five_hours_ago = int(time.time()) - (5 * 3600) + heartbeat_now = int(time.time()) # heartbeat just happened + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ?, last_heartbeat_at = ? " + "WHERE id = ?", + (five_hours_ago, heartbeat_now, t), + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True) + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None, + ) + assert stale == [], "Task with recent heartbeat should not be reclaimed" + assert kb.get_task(conn, t).status == "running" + + +def test_detect_stale_skips_recently_started_task(kanban_home, monkeypatch): + """A task started < timeout ago is NOT reclaimed even with no heartbeat.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="fresh", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + # Started only 1 hour ago — well within the 4h threshold. + one_hour_ago = int(time.time()) - 3600 + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (one_hour_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (one_hour_ago, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True) + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None, + ) + assert stale == [], "Task started <4h ago should not be reclaimed" + assert kb.get_task(conn, t).status == "running" + + +def test_detect_stale_skips_when_timeout_zero(kanban_home, monkeypatch): + """stale_timeout_seconds=0 disables stale detection entirely.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="disabled", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + five_hours_ago = int(time.time()) - (5 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=0, signal_fn=lambda p, s: None, + ) + assert stale == [], "timeout=0 should disable stale detection" + assert kb.get_task(conn, t).status == "running" + + +def test_detect_stale_skips_blocked_tasks(kanban_home, monkeypatch): + """Blocked tasks are NOT reclaimed by stale detection.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="blocked-task", assignee="worker") + kb.claim_task(conn, t) + kb._set_worker_pid(conn, t, os.getpid()) + + five_hours_ago = int(time.time()) - (5 * 3600) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", (five_hours_ago, t) + ) + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (five_hours_ago, t), + ) + # Block the task explicitly. + kb.block_task(conn, t, reason="human requested block") + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + stale = kb.detect_stale_running( + conn, stale_timeout_seconds=14400, signal_fn=lambda p, s: None, + ) + assert stale == [], "Blocked task should not be reclaimed by stale detection" + assert kb.get_task(conn, t).status == "blocked"