From 06f24351c57666e5a15de8ed7b8743b694b5a809 Mon Sep 17 00:00:00 2001 From: LeonSGP43 Date: Thu, 7 May 2026 18:49:18 +0800 Subject: [PATCH] fix(kanban): stop reclaimed workers before retry --- hermes_cli/kanban_db.py | 138 ++++++++++++++---- .../test_kanban_core_functionality.py | 20 ++- tests/hermes_cli/test_kanban_db.py | 21 ++- 3 files changed, 141 insertions(+), 38 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 2d2f1b2ecf..3b38c124e3 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1859,34 +1859,47 @@ def heartbeat_claim( return False -def release_stale_claims(conn: sqlite3.Connection) -> int: +def release_stale_claims( + conn: sqlite3.Connection, + *, + signal_fn=None, +) -> int: """Reset any ``running`` task whose claim has expired. Returns the number of stale claims reclaimed. Safe to call often. """ now = int(time.time()) reclaimed = 0 - with write_txn(conn): - stale = conn.execute( - "SELECT id, claim_lock FROM tasks " - "WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?", - (now,), - ).fetchall() - for row in stale: - conn.execute( + stale = conn.execute( + "SELECT id, claim_lock, worker_pid FROM tasks " + "WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?", + (now,), + ).fetchall() + for row in stale: + termination = _terminate_reclaimed_worker( + row["worker_pid"], row["claim_lock"], signal_fn=signal_fn, + ) + with write_txn(conn): + cur = conn.execute( "UPDATE tasks SET status = 'ready', claim_lock = NULL, " "claim_expires = NULL, worker_pid = NULL " - "WHERE id = ? AND status = 'running'", - (row["id"],), + "WHERE id = ? AND status = 'running' AND claim_lock IS ? " + "AND claim_expires IS NOT NULL AND claim_expires < ?", + (row["id"], row["claim_lock"], now), ) + if cur.rowcount != 1: + continue run_id = _end_run( conn, row["id"], outcome="reclaimed", status="reclaimed", error=f"stale_lock={row['claim_lock']}", + metadata=termination, ) + payload = {"stale_lock": row["claim_lock"]} + payload.update(termination) _append_event( conn, row["id"], "reclaimed", - {"stale_lock": row["claim_lock"]}, + payload, run_id=run_id, ) reclaimed += 1 @@ -1898,6 +1911,7 @@ def reclaim_task( task_id: str, *, reason: Optional[str] = None, + signal_fn=None, ) -> bool: """Operator-driven reclaim: release the claim and reset to ``ready``. @@ -1910,24 +1924,29 @@ def reclaim_task( Returns True if a reclaim happened, False if the task isn't in a reclaimable state (not running, or doesn't exist). """ + row = conn.execute( + "SELECT status, claim_lock, worker_pid FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + if not row: + return False + if row["status"] != "running" and row["claim_lock"] is None: + # Nothing to reclaim — already ready / blocked / done. + return False + prev_lock = row["claim_lock"] + termination = _terminate_reclaimed_worker( + row["worker_pid"], prev_lock, signal_fn=signal_fn, + ) with write_txn(conn): - row = conn.execute( - "SELECT status, claim_lock, worker_pid FROM tasks WHERE id = ?", - (task_id,), - ).fetchone() - if not row: - return False - if row["status"] != "running" and row["claim_lock"] is None: - # Nothing to reclaim — already ready / blocked / done. - return False - prev_lock = row["claim_lock"] - prev_pid = row["worker_pid"] - conn.execute( + cur = conn.execute( "UPDATE tasks SET status = 'ready', claim_lock = NULL, " "claim_expires = NULL, worker_pid = NULL " - "WHERE id = ? AND status IN ('running', 'ready', 'blocked')", - (task_id,), + "WHERE id = ? AND status IN ('running', 'ready', 'blocked') " + "AND claim_lock IS ?", + (task_id, prev_lock), ) + if cur.rowcount != 1: + return False run_id = _end_run( conn, task_id, outcome="reclaimed", status="reclaimed", @@ -1935,15 +1954,17 @@ def reclaim_task( f"manual_reclaim: {reason}" if reason else f"manual_reclaim lock={prev_lock}" ), + metadata=termination, ) + payload = { + "manual": True, + "reason": reason, + "prev_lock": prev_lock, + } + payload.update(termination) _append_event( conn, task_id, "reclaimed", - { - "manual": True, - "reason": reason, - "prev_lock": prev_lock, - "prev_pid": prev_pid, - }, + payload, run_id=run_id, ) # Operator intervention — they've looked at the task, so the @@ -2652,6 +2673,59 @@ def _pid_alive(pid: Optional[int]) -> bool: return True +def _terminate_reclaimed_worker( + pid: Optional[int], + claim_lock: Optional[str], + *, + signal_fn=None, +) -> dict[str, Any]: + """Best-effort host-local worker termination for reclaim paths.""" + import signal + + info: dict[str, Any] = { + "prev_pid": int(pid) if pid else None, + "host_local": False, + "termination_attempted": False, + "terminated": False, + "sigkill": False, + } + if not pid or pid <= 0 or not claim_lock: + return info + + host_prefix = f"{_claimer_id().split(':', 1)[0]}:" + if not str(claim_lock).startswith(host_prefix): + return info + info["host_local"] = True + + kill = signal_fn if signal_fn is not None else ( + os.kill if hasattr(os, "kill") else None + ) + if kill is None: + return info + + info["termination_attempted"] = True + try: + kill(int(pid), signal.SIGTERM) + except (ProcessLookupError, OSError): + return info + + for _ in range(10): + if not _pid_alive(pid): + info["terminated"] = True + return info + time.sleep(0.5) + + if _pid_alive(pid): + try: + kill(int(pid), signal.SIGKILL) + info["sigkill"] = True + except (ProcessLookupError, OSError): + return info + + info["terminated"] = not _pid_alive(pid) + return info + + def heartbeat_worker( conn: sqlite3.Connection, task_id: str, diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 1e286d7ce6..613c230847 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -3283,17 +3283,28 @@ def test_complete_prose_scan_ignores_existing_ids(kanban_home): # Recovery helpers (reclaim + reassign) # --------------------------------------------------------------------------- -def test_reclaim_task_resets_running_to_ready(kanban_home): +def test_reclaim_task_resets_running_to_ready(kanban_home, monkeypatch): """Manual reclaim releases the claim, resets status, and emits a ``reclaimed`` event even when claim_expires has not passed.""" + import signal import time import secrets + import hermes_cli.kanban_db as _kb conn = kb.connect() try: t = kb.create_task(conn, title="stuck", assignee="broken") # Simulate a live claim (not expired). - lock = secrets.token_hex(8) + lock = f"{_kb._claimer_id().split(':', 1)[0]}:{secrets.token_hex(8)}" future = int(time.time()) + 3600 + killed: list[int] = [] + state = {"alive": True} + + def _signal(pid, sig): + killed.append(sig) + if sig == signal.SIGTERM: + state["alive"] = False + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"]) conn.execute( "UPDATE tasks SET status='running', claim_lock=?, claim_expires=?, " "worker_pid=? WHERE id=?", @@ -3312,7 +3323,7 @@ def test_reclaim_task_resets_running_to_ready(kanban_home): assert kb.release_stale_claims(conn) == 0 # reclaim_task should work immediately. - assert kb.reclaim_task(conn, t, reason="test reason") is True + assert kb.reclaim_task(conn, t, reason="test reason", signal_fn=_signal) is True row = conn.execute( "SELECT status, claim_lock, worker_pid FROM tasks WHERE id=?", @@ -3333,6 +3344,9 @@ def test_reclaim_task_resets_running_to_ready(kanban_home): assert len(reclaim_evs) == 1 assert reclaim_evs[0].get("manual") is True assert reclaim_evs[0].get("reason") == "test reason" + assert reclaim_evs[0].get("termination_attempted") is True + assert reclaim_evs[0].get("terminated") is True + assert killed == [signal.SIGTERM] finally: conn.close() diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 7068e773d1..2375d6c4bc 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -168,18 +168,33 @@ def test_claim_fails_on_non_ready(kanban_home): assert kb.claim_task(conn, t) is None -def test_stale_claim_reclaimed(kanban_home): +def test_stale_claim_reclaimed(kanban_home, monkeypatch): + import signal + import hermes_cli.kanban_db as _kb + with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="a") - kb.claim_task(conn, t) + host = _kb._claimer_id().split(":", 1)[0] + kb.claim_task(conn, t, claimer=f"{host}:worker") + killed: list[int] = [] + state = {"alive": True} + + def _signal(pid, sig): + killed.append(sig) + if sig == signal.SIGTERM: + state["alive"] = False + + kb._set_worker_pid(conn, t, 12345) # Rewind claim_expires so it looks stale. conn.execute( "UPDATE tasks SET claim_expires = ? WHERE id = ?", (int(time.time()) - 3600, t), ) - reclaimed = kb.release_stale_claims(conn) + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"]) + reclaimed = kb.release_stale_claims(conn, signal_fn=_signal) assert reclaimed == 1 assert kb.get_task(conn, t).status == "ready" + assert killed == [signal.SIGTERM] def test_max_runtime_uses_current_run_start_after_retry(kanban_home):