mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(kanban): extend stale claim instead of killing live worker
Workers running slow models (e.g. kimi-k2.6) can spend longer than DEFAULT_CLAIM_TTL_SECONDS inside a single tool-free LLM call, making no tool calls and therefore not heartbeating. release_stale_claims previously reclaimed these healthy workers, producing the spawn-then-immediately-reclaim loop reported in #23025. When a stale-by-TTL claim's host-local worker PID is still alive, extend the claim (emit a claim_extended event) rather than killing it. enforce_max_runtime / detect_crashed_workers remain the upper bounds for genuinely wedged or dead workers. Reclaim events now also record claim_expires, last_heartbeat_at, worker_pid, and host_local so operators can see why a worker was killed.
This commit is contained in:
parent
3974a137c6
commit
88588b6159
2 changed files with 155 additions and 9 deletions
|
|
@ -1997,16 +1997,69 @@ def release_stale_claims(
|
|||
) -> int:
|
||||
"""Reset any ``running`` task whose claim has expired.
|
||||
|
||||
Returns the number of stale claims reclaimed. Safe to call often.
|
||||
A stale-by-TTL claim whose host-local worker PID is still alive is
|
||||
*extended* (with a ``claim_extended`` event) instead of being
|
||||
reclaimed. Reclaiming a live worker mid-flight produces the spawn-
|
||||
then-immediately-reclaim loop seen on slow models that spend longer
|
||||
than ``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM
|
||||
call (#23025): no tool calls means no ``kanban_heartbeat``, even
|
||||
though the subprocess is healthy. ``enforce_max_runtime`` and
|
||||
``detect_crashed_workers`` remain the upper bounds for genuinely
|
||||
wedged or dead workers.
|
||||
|
||||
Returns the number of stale claims actually reclaimed (live-pid
|
||||
extensions don't count). Safe to call often.
|
||||
"""
|
||||
now = int(time.time())
|
||||
reclaimed = 0
|
||||
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
||||
stale = conn.execute(
|
||||
"SELECT id, claim_lock, worker_pid FROM tasks "
|
||||
"WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?",
|
||||
"SELECT id, claim_lock, worker_pid, claim_expires, last_heartbeat_at "
|
||||
"FROM tasks "
|
||||
"WHERE status = 'running' AND claim_expires IS NOT NULL "
|
||||
" AND claim_expires < ?",
|
||||
(now,),
|
||||
).fetchall()
|
||||
for row in stale:
|
||||
lock = row["claim_lock"] or ""
|
||||
host_local = lock.startswith(host_prefix)
|
||||
if host_local and row["worker_pid"] and _pid_alive(row["worker_pid"]):
|
||||
new_expires = now + int(DEFAULT_CLAIM_TTL_SECONDS)
|
||||
with write_txn(conn):
|
||||
cur = conn.execute(
|
||||
"UPDATE tasks SET claim_expires = ? "
|
||||
"WHERE id = ? AND status = 'running' "
|
||||
" AND claim_lock IS ? "
|
||||
" AND claim_expires IS NOT NULL "
|
||||
" AND claim_expires < ?",
|
||||
(new_expires, row["id"], row["claim_lock"], now),
|
||||
)
|
||||
if cur.rowcount != 1:
|
||||
continue
|
||||
run_id = _current_run_id(conn, row["id"])
|
||||
if run_id is not None:
|
||||
conn.execute(
|
||||
"UPDATE task_runs SET claim_expires = ? WHERE id = ?",
|
||||
(new_expires, run_id),
|
||||
)
|
||||
_append_event(
|
||||
conn, row["id"], "claim_extended",
|
||||
{
|
||||
"reason": "pid_alive",
|
||||
"worker_pid": int(row["worker_pid"]),
|
||||
"claim_lock": row["claim_lock"],
|
||||
"claim_expires_was": int(row["claim_expires"]),
|
||||
"claim_expires_now": new_expires,
|
||||
"last_heartbeat_at": (
|
||||
int(row["last_heartbeat_at"])
|
||||
if row["last_heartbeat_at"] is not None
|
||||
else None
|
||||
),
|
||||
},
|
||||
run_id=run_id,
|
||||
)
|
||||
continue
|
||||
|
||||
termination = _terminate_reclaimed_worker(
|
||||
row["worker_pid"], row["claim_lock"], signal_fn=signal_fn,
|
||||
)
|
||||
|
|
@ -2026,7 +2079,20 @@ def release_stale_claims(
|
|||
error=f"stale_lock={row['claim_lock']}",
|
||||
metadata=termination,
|
||||
)
|
||||
payload = {"stale_lock": row["claim_lock"]}
|
||||
payload = {
|
||||
"stale_lock": row["claim_lock"],
|
||||
"worker_pid": (
|
||||
int(row["worker_pid"])
|
||||
if row["worker_pid"] is not None else None
|
||||
),
|
||||
"claim_expires": int(row["claim_expires"]),
|
||||
"last_heartbeat_at": (
|
||||
int(row["last_heartbeat_at"])
|
||||
if row["last_heartbeat_at"] is not None else None
|
||||
),
|
||||
"now": now,
|
||||
"host_local": host_local,
|
||||
}
|
||||
payload.update(termination)
|
||||
_append_event(
|
||||
conn, row["id"], "reclaimed",
|
||||
|
|
|
|||
|
|
@ -177,12 +177,9 @@ def test_stale_claim_reclaimed(kanban_home, monkeypatch):
|
|||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
||||
killed: list[int] = []
|
||||
state = {"alive": True}
|
||||
|
||||
def _signal(pid, sig):
|
||||
def _signal(_pid, sig):
|
||||
killed.append(sig)
|
||||
if sig == signal.SIGTERM:
|
||||
state["alive"] = False
|
||||
|
||||
kb._set_worker_pid(conn, t, 12345)
|
||||
# Rewind claim_expires so it looks stale.
|
||||
|
|
@ -190,13 +187,96 @@ def test_stale_claim_reclaimed(kanban_home, monkeypatch):
|
|||
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
||||
(int(time.time()) - 3600, t),
|
||||
)
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"])
|
||||
# Worker PID has died — exactly the case ``release_stale_claims``
|
||||
# should still reclaim (post-#23025: live PIDs are now extended).
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
reclaimed = kb.release_stale_claims(conn, signal_fn=_signal)
|
||||
assert reclaimed == 1
|
||||
assert kb.get_task(conn, t).status == "ready"
|
||||
assert killed == [signal.SIGTERM]
|
||||
|
||||
|
||||
def test_stale_claim_with_live_pid_extends_instead_of_reclaiming(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""A stale-by-TTL claim whose worker PID is still alive should be
|
||||
extended, not reclaimed (#23025). Slow models can spend longer than
|
||||
``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM call;
|
||||
killing those healthy workers produces a respawn loop with zero
|
||||
progress."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
||||
kb._set_worker_pid(conn, t, 12345)
|
||||
|
||||
old_expires = int(time.time()) - 60
|
||||
conn.execute(
|
||||
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
||||
(old_expires, t),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: True)
|
||||
killed: list[int] = []
|
||||
reclaimed = kb.release_stale_claims(
|
||||
conn, signal_fn=lambda _p, sig: killed.append(sig),
|
||||
)
|
||||
assert reclaimed == 0
|
||||
task = kb.get_task(conn, t)
|
||||
assert task.status == "running"
|
||||
assert task.claim_expires is not None
|
||||
assert task.claim_expires > old_expires
|
||||
assert killed == [] # live worker not killed
|
||||
|
||||
kinds = [
|
||||
r["kind"] for r in conn.execute(
|
||||
"SELECT kind FROM task_events WHERE task_id = ?", (t,),
|
||||
).fetchall()
|
||||
]
|
||||
assert "claim_extended" in kinds
|
||||
assert "reclaimed" not in kinds
|
||||
|
||||
|
||||
def test_stale_claim_reclaim_event_records_diagnostic_payload(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""``reclaimed`` events should carry claim_expires, last_heartbeat_at,
|
||||
and worker_pid so operators can diagnose why a claim went stale
|
||||
(#23025: previous payload only had ``stale_lock`` which gives no
|
||||
timing context)."""
|
||||
import json
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="x", assignee="a")
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
kb.claim_task(conn, t, claimer=f"{host}:worker")
|
||||
kb._set_worker_pid(conn, t, 12345)
|
||||
old_expires = int(time.time()) - 3600
|
||||
hb_at = int(time.time()) - 1800
|
||||
conn.execute(
|
||||
"UPDATE tasks SET claim_expires = ?, last_heartbeat_at = ? "
|
||||
"WHERE id = ?",
|
||||
(old_expires, hb_at, t),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
kb.release_stale_claims(conn, signal_fn=lambda _p, _s: None)
|
||||
row = conn.execute(
|
||||
"SELECT payload FROM task_events "
|
||||
"WHERE task_id = ? AND kind = 'reclaimed'",
|
||||
(t,),
|
||||
).fetchone()
|
||||
assert row is not None
|
||||
payload = json.loads(row["payload"])
|
||||
assert payload["claim_expires"] == old_expires
|
||||
assert payload["last_heartbeat_at"] == hb_at
|
||||
assert payload["worker_pid"] == 12345
|
||||
assert payload["host_local"] is True
|
||||
|
||||
|
||||
def test_max_runtime_uses_current_run_start_after_retry(kanban_home):
|
||||
"""A retry should get a fresh max-runtime window.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue