fix(kanban): stop reclaimed workers before retry

This commit is contained in:
LeonSGP43 2026-05-07 18:49:18 +08:00 committed by Teknium
parent 63bd690a50
commit 06f24351c5
3 changed files with 141 additions and 38 deletions

View file

@ -1859,34 +1859,47 @@ def heartbeat_claim(
return False
def release_stale_claims(conn: sqlite3.Connection) -> int:
def release_stale_claims(
conn: sqlite3.Connection,
*,
signal_fn=None,
) -> int:
"""Reset any ``running`` task whose claim has expired.
Returns the number of stale claims reclaimed. Safe to call often.
"""
now = int(time.time())
reclaimed = 0
with write_txn(conn):
stale = conn.execute(
"SELECT id, claim_lock FROM tasks "
"WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?",
(now,),
).fetchall()
for row in stale:
conn.execute(
stale = conn.execute(
"SELECT id, claim_lock, worker_pid FROM tasks "
"WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?",
(now,),
).fetchall()
for row in stale:
termination = _terminate_reclaimed_worker(
row["worker_pid"], row["claim_lock"], signal_fn=signal_fn,
)
with write_txn(conn):
cur = conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL "
"WHERE id = ? AND status = 'running'",
(row["id"],),
"WHERE id = ? AND status = 'running' AND claim_lock IS ? "
"AND claim_expires IS NOT NULL AND claim_expires < ?",
(row["id"], row["claim_lock"], now),
)
if cur.rowcount != 1:
continue
run_id = _end_run(
conn, row["id"],
outcome="reclaimed", status="reclaimed",
error=f"stale_lock={row['claim_lock']}",
metadata=termination,
)
payload = {"stale_lock": row["claim_lock"]}
payload.update(termination)
_append_event(
conn, row["id"], "reclaimed",
{"stale_lock": row["claim_lock"]},
payload,
run_id=run_id,
)
reclaimed += 1
@ -1898,6 +1911,7 @@ def reclaim_task(
task_id: str,
*,
reason: Optional[str] = None,
signal_fn=None,
) -> bool:
"""Operator-driven reclaim: release the claim and reset to ``ready``.
@ -1910,24 +1924,29 @@ def reclaim_task(
Returns True if a reclaim happened, False if the task isn't in a
reclaimable state (not running, or doesn't exist).
"""
row = conn.execute(
"SELECT status, claim_lock, worker_pid FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
if not row:
return False
if row["status"] != "running" and row["claim_lock"] is None:
# Nothing to reclaim — already ready / blocked / done.
return False
prev_lock = row["claim_lock"]
termination = _terminate_reclaimed_worker(
row["worker_pid"], prev_lock, signal_fn=signal_fn,
)
with write_txn(conn):
row = conn.execute(
"SELECT status, claim_lock, worker_pid FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
if not row:
return False
if row["status"] != "running" and row["claim_lock"] is None:
# Nothing to reclaim — already ready / blocked / done.
return False
prev_lock = row["claim_lock"]
prev_pid = row["worker_pid"]
conn.execute(
cur = conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL "
"WHERE id = ? AND status IN ('running', 'ready', 'blocked')",
(task_id,),
"WHERE id = ? AND status IN ('running', 'ready', 'blocked') "
"AND claim_lock IS ?",
(task_id, prev_lock),
)
if cur.rowcount != 1:
return False
run_id = _end_run(
conn, task_id,
outcome="reclaimed", status="reclaimed",
@ -1935,15 +1954,17 @@ def reclaim_task(
f"manual_reclaim: {reason}" if reason
else f"manual_reclaim lock={prev_lock}"
),
metadata=termination,
)
payload = {
"manual": True,
"reason": reason,
"prev_lock": prev_lock,
}
payload.update(termination)
_append_event(
conn, task_id, "reclaimed",
{
"manual": True,
"reason": reason,
"prev_lock": prev_lock,
"prev_pid": prev_pid,
},
payload,
run_id=run_id,
)
# Operator intervention — they've looked at the task, so the
@ -2652,6 +2673,59 @@ def _pid_alive(pid: Optional[int]) -> bool:
return True
def _terminate_reclaimed_worker(
pid: Optional[int],
claim_lock: Optional[str],
*,
signal_fn=None,
) -> dict[str, Any]:
"""Best-effort host-local worker termination for reclaim paths."""
import signal
info: dict[str, Any] = {
"prev_pid": int(pid) if pid else None,
"host_local": False,
"termination_attempted": False,
"terminated": False,
"sigkill": False,
}
if not pid or pid <= 0 or not claim_lock:
return info
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
if not str(claim_lock).startswith(host_prefix):
return info
info["host_local"] = True
kill = signal_fn if signal_fn is not None else (
os.kill if hasattr(os, "kill") else None
)
if kill is None:
return info
info["termination_attempted"] = True
try:
kill(int(pid), signal.SIGTERM)
except (ProcessLookupError, OSError):
return info
for _ in range(10):
if not _pid_alive(pid):
info["terminated"] = True
return info
time.sleep(0.5)
if _pid_alive(pid):
try:
kill(int(pid), signal.SIGKILL)
info["sigkill"] = True
except (ProcessLookupError, OSError):
return info
info["terminated"] = not _pid_alive(pid)
return info
def heartbeat_worker(
conn: sqlite3.Connection,
task_id: str,

View file

@ -3283,17 +3283,28 @@ def test_complete_prose_scan_ignores_existing_ids(kanban_home):
# Recovery helpers (reclaim + reassign)
# ---------------------------------------------------------------------------
def test_reclaim_task_resets_running_to_ready(kanban_home):
def test_reclaim_task_resets_running_to_ready(kanban_home, monkeypatch):
"""Manual reclaim releases the claim, resets status, and emits a
``reclaimed`` event even when claim_expires has not passed."""
import signal
import time
import secrets
import hermes_cli.kanban_db as _kb
conn = kb.connect()
try:
t = kb.create_task(conn, title="stuck", assignee="broken")
# Simulate a live claim (not expired).
lock = secrets.token_hex(8)
lock = f"{_kb._claimer_id().split(':', 1)[0]}:{secrets.token_hex(8)}"
future = int(time.time()) + 3600
killed: list[int] = []
state = {"alive": True}
def _signal(pid, sig):
killed.append(sig)
if sig == signal.SIGTERM:
state["alive"] = False
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"])
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, claim_expires=?, "
"worker_pid=? WHERE id=?",
@ -3312,7 +3323,7 @@ def test_reclaim_task_resets_running_to_ready(kanban_home):
assert kb.release_stale_claims(conn) == 0
# reclaim_task should work immediately.
assert kb.reclaim_task(conn, t, reason="test reason") is True
assert kb.reclaim_task(conn, t, reason="test reason", signal_fn=_signal) is True
row = conn.execute(
"SELECT status, claim_lock, worker_pid FROM tasks WHERE id=?",
@ -3333,6 +3344,9 @@ def test_reclaim_task_resets_running_to_ready(kanban_home):
assert len(reclaim_evs) == 1
assert reclaim_evs[0].get("manual") is True
assert reclaim_evs[0].get("reason") == "test reason"
assert reclaim_evs[0].get("termination_attempted") is True
assert reclaim_evs[0].get("terminated") is True
assert killed == [signal.SIGTERM]
finally:
conn.close()

View file

@ -168,18 +168,33 @@ def test_claim_fails_on_non_ready(kanban_home):
assert kb.claim_task(conn, t) is None
def test_stale_claim_reclaimed(kanban_home):
def test_stale_claim_reclaimed(kanban_home, monkeypatch):
import signal
import hermes_cli.kanban_db as _kb
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="a")
kb.claim_task(conn, t)
host = _kb._claimer_id().split(":", 1)[0]
kb.claim_task(conn, t, claimer=f"{host}:worker")
killed: list[int] = []
state = {"alive": True}
def _signal(pid, sig):
killed.append(sig)
if sig == signal.SIGTERM:
state["alive"] = False
kb._set_worker_pid(conn, t, 12345)
# Rewind claim_expires so it looks stale.
conn.execute(
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
(int(time.time()) - 3600, t),
)
reclaimed = kb.release_stale_claims(conn)
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: state["alive"])
reclaimed = kb.release_stale_claims(conn, signal_fn=_signal)
assert reclaimed == 1
assert kb.get_task(conn, t).status == "ready"
assert killed == [signal.SIGTERM]
def test_max_runtime_uses_current_run_start_after_retry(kanban_home):