From 88588b6159953407420d3778c4a5e87fecd0a30f Mon Sep 17 00:00:00 2001 From: konsisumer <11262660+konsisumer@users.noreply.github.com> Date: Sun, 10 May 2026 09:52:29 +0200 Subject: [PATCH] fix(kanban): extend stale claim instead of killing live worker Workers running slow models (e.g. kimi-k2.6) can spend longer than DEFAULT_CLAIM_TTL_SECONDS inside a single tool-free LLM call, making no tool calls and therefore not heartbeating. release_stale_claims previously reclaimed these healthy workers, producing the spawn-then-immediately-reclaim loop reported in #23025. When a stale-by-TTL claim's host-local worker PID is still alive, extend the claim (emit a claim_extended event) rather than killing it. enforce_max_runtime / detect_crashed_workers remain the upper bounds for genuinely wedged or dead workers. Reclaim events now also record claim_expires, last_heartbeat_at, worker_pid, and host_local so operators can see why a worker was killed. --- hermes_cli/kanban_db.py | 74 ++++++++++++++++++++++-- tests/hermes_cli/test_kanban_db.py | 90 ++++++++++++++++++++++++++++-- 2 files changed, 155 insertions(+), 9 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 1e50b97dd68..a57b27ce5f5 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1997,16 +1997,69 @@ def release_stale_claims( ) -> int: """Reset any ``running`` task whose claim has expired. - Returns the number of stale claims reclaimed. Safe to call often. + A stale-by-TTL claim whose host-local worker PID is still alive is + *extended* (with a ``claim_extended`` event) instead of being + reclaimed. Reclaiming a live worker mid-flight produces the spawn- + then-immediately-reclaim loop seen on slow models that spend longer + than ``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM + call (#23025): no tool calls means no ``kanban_heartbeat``, even + though the subprocess is healthy. ``enforce_max_runtime`` and + ``detect_crashed_workers`` remain the upper bounds for genuinely + wedged or dead workers. + + Returns the number of stale claims actually reclaimed (live-pid + extensions don't count). Safe to call often. """ now = int(time.time()) reclaimed = 0 + host_prefix = f"{_claimer_id().split(':', 1)[0]}:" stale = conn.execute( - "SELECT id, claim_lock, worker_pid FROM tasks " - "WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?", + "SELECT id, claim_lock, worker_pid, claim_expires, last_heartbeat_at " + "FROM tasks " + "WHERE status = 'running' AND claim_expires IS NOT NULL " + " AND claim_expires < ?", (now,), ).fetchall() for row in stale: + lock = row["claim_lock"] or "" + host_local = lock.startswith(host_prefix) + if host_local and row["worker_pid"] and _pid_alive(row["worker_pid"]): + new_expires = now + int(DEFAULT_CLAIM_TTL_SECONDS) + with write_txn(conn): + cur = conn.execute( + "UPDATE tasks SET claim_expires = ? " + "WHERE id = ? AND status = 'running' " + " AND claim_lock IS ? " + " AND claim_expires IS NOT NULL " + " AND claim_expires < ?", + (new_expires, row["id"], row["claim_lock"], now), + ) + if cur.rowcount != 1: + continue + run_id = _current_run_id(conn, row["id"]) + if run_id is not None: + conn.execute( + "UPDATE task_runs SET claim_expires = ? WHERE id = ?", + (new_expires, run_id), + ) + _append_event( + conn, row["id"], "claim_extended", + { + "reason": "pid_alive", + "worker_pid": int(row["worker_pid"]), + "claim_lock": row["claim_lock"], + "claim_expires_was": int(row["claim_expires"]), + "claim_expires_now": new_expires, + "last_heartbeat_at": ( + int(row["last_heartbeat_at"]) + if row["last_heartbeat_at"] is not None + else None + ), + }, + run_id=run_id, + ) + continue + termination = _terminate_reclaimed_worker( row["worker_pid"], row["claim_lock"], signal_fn=signal_fn, ) @@ -2026,7 +2079,20 @@ def release_stale_claims( error=f"stale_lock={row['claim_lock']}", metadata=termination, ) - payload = {"stale_lock": row["claim_lock"]} + payload = { + "stale_lock": row["claim_lock"], + "worker_pid": ( + int(row["worker_pid"]) + if row["worker_pid"] is not None else None + ), + "claim_expires": int(row["claim_expires"]), + "last_heartbeat_at": ( + int(row["last_heartbeat_at"]) + if row["last_heartbeat_at"] is not None else None + ), + "now": now, + "host_local": host_local, + } payload.update(termination) _append_event( conn, row["id"], "reclaimed", diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index a00f0b2d588..fb1bdbf0cf6 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -177,12 +177,9 @@ def test_stale_claim_reclaimed(kanban_home, monkeypatch): host = _kb._claimer_id().split(":", 1)[0] kb.claim_task(conn, t, claimer=f"{host}:worker") killed: list[int] = [] - state = {"alive": True} - def _signal(pid, sig): + def _signal(_pid, sig): killed.append(sig) - if sig == signal.SIGTERM: - state["alive"] = False kb._set_worker_pid(conn, t, 12345) # Rewind claim_expires so it looks stale. @@ -190,13 +187,96 @@ def test_stale_claim_reclaimed(kanban_home, monkeypatch): "UPDATE tasks SET claim_expires = ? WHERE id = ?", (int(time.time()) - 3600, t), ) - monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"]) + # Worker PID has died — exactly the case ``release_stale_claims`` + # should still reclaim (post-#23025: live PIDs are now extended). + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) reclaimed = kb.release_stale_claims(conn, signal_fn=_signal) assert reclaimed == 1 assert kb.get_task(conn, t).status == "ready" assert killed == [signal.SIGTERM] +def test_stale_claim_with_live_pid_extends_instead_of_reclaiming( + kanban_home, monkeypatch, +): + """A stale-by-TTL claim whose worker PID is still alive should be + extended, not reclaimed (#23025). Slow models can spend longer than + ``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM call; + killing those healthy workers produces a respawn loop with zero + progress.""" + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="x", assignee="a") + host = _kb._claimer_id().split(":", 1)[0] + kb.claim_task(conn, t, claimer=f"{host}:worker") + kb._set_worker_pid(conn, t, 12345) + + old_expires = int(time.time()) - 60 + conn.execute( + "UPDATE tasks SET claim_expires = ? WHERE id = ?", + (old_expires, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True) + killed: list[int] = [] + reclaimed = kb.release_stale_claims( + conn, signal_fn=lambda _p, sig: killed.append(sig), + ) + assert reclaimed == 0 + task = kb.get_task(conn, t) + assert task.status == "running" + assert task.claim_expires is not None + assert task.claim_expires > old_expires + assert killed == [] # live worker not killed + + kinds = [ + r["kind"] for r in conn.execute( + "SELECT kind FROM task_events WHERE task_id = ?", (t,), + ).fetchall() + ] + assert "claim_extended" in kinds + assert "reclaimed" not in kinds + + +def test_stale_claim_reclaim_event_records_diagnostic_payload( + kanban_home, monkeypatch, +): + """``reclaimed`` events should carry claim_expires, last_heartbeat_at, + and worker_pid so operators can diagnose why a claim went stale + (#23025: previous payload only had ``stale_lock`` which gives no + timing context).""" + import json + import hermes_cli.kanban_db as _kb + + with kb.connect() as conn: + t = kb.create_task(conn, title="x", assignee="a") + host = _kb._claimer_id().split(":", 1)[0] + kb.claim_task(conn, t, claimer=f"{host}:worker") + kb._set_worker_pid(conn, t, 12345) + old_expires = int(time.time()) - 3600 + hb_at = int(time.time()) - 1800 + conn.execute( + "UPDATE tasks SET claim_expires = ?, last_heartbeat_at = ? " + "WHERE id = ?", + (old_expires, hb_at, t), + ) + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None) + row = conn.execute( + "SELECT payload FROM task_events " + "WHERE task_id = ? AND kind = 'reclaimed'", + (t,), + ).fetchone() + assert row is not None + payload = json.loads(row["payload"]) + assert payload["claim_expires"] == old_expires + assert payload["last_heartbeat_at"] == hb_at + assert payload["worker_pid"] == 12345 + assert payload["host_local"] is True + + def test_max_runtime_uses_current_run_start_after_retry(kanban_home): """A retry should get a fresh max-runtime window.