fix(kanban): add grace period to detect_crashed_workers

`detect_crashed_workers` calls `_pid_alive` on every `running` task whose
claim is held by this host. The check can transiently return False for a
freshly-spawned worker (fork → /proc-visibility lag, or reap-race
between SIGCHLD and parent reaping). When a second dispatcher ticks
inside that window it reclaims the task and spawns a duplicate worker.

Add `DEFAULT_CRASH_GRACE_SECONDS = 30` and an
`HERMES_KANBAN_CRASH_GRACE_SECONDS` env-var override.
`detect_crashed_workers` skips the liveness check when
`time.time() - started_at < grace`. The existing 15-minute claim TTL
still reclaims genuinely-crashed workers; grace only suppresses the
launch-window false positive.

`HERMES_KANBAN_CRASH_GRACE_SECONDS=0` is set on the `kanban_home`
fixture in `test_kanban_core_functionality.py` so existing tests that
assert immediate reclaim retain pre-fix semantics.

Companion to merged PR #23442 (`release_stale_claims`, closes #23025),
which addressed the same multi-dispatcher race in the stale-claim path.
Related: #20015 (`_pid_alive` false-negative behaviour),
This commit is contained in:
Stephen Chin 2026-05-23 19:23:17 -07:00 committed by kshitij
parent e83252dc46
commit c002668ff0
3 changed files with 114 additions and 1 deletions

View file

@ -134,6 +134,34 @@ def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
return DEFAULT_CLAIM_TTL_SECONDS
# Grace period after a task transitions to ``running`` during which
# ``detect_crashed_workers`` skips the ``_pid_alive`` check. Covers the
# fork() → /proc-visibility window where liveness can transiently report
# False for a freshly-spawned worker. The 15-minute claim TTL still
# catches genuinely-crashed workers; this only suppresses false positives
# during the launch window.
DEFAULT_CRASH_GRACE_SECONDS = 30
def _resolve_crash_grace_seconds() -> int:
"""Return the crash-detection grace period in seconds.
Reads ``HERMES_KANBAN_CRASH_GRACE_SECONDS`` from the environment;
falls back to ``DEFAULT_CRASH_GRACE_SECONDS`` when absent, empty,
non-integer, or negative. A value of 0 restores immediate-reclaim
behaviour (useful for tests).
"""
raw = os.environ.get("HERMES_KANBAN_CRASH_GRACE_SECONDS", "").strip()
if raw:
try:
parsed = int(raw)
except ValueError:
parsed = -1
if parsed >= 0:
return parsed
return DEFAULT_CRASH_GRACE_SECONDS
# Worker-context caps so build_worker_context() stays bounded on
# pathological boards (retry-heavy tasks, comment storms, giant
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
@ -4653,7 +4681,7 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
# (task_id, pid, claimer, protocol_violation, error_text)
with write_txn(conn):
rows = conn.execute(
"SELECT id, worker_pid, claim_lock FROM tasks "
"SELECT id, worker_pid, claim_lock, started_at FROM tasks "
"WHERE status = 'running' AND worker_pid IS NOT NULL"
).fetchall()
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
@ -4662,6 +4690,14 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
lock = row["claim_lock"] or ""
if not lock.startswith(host_prefix):
continue
# Skip liveness check inside the launch-window grace period
# so a freshly-spawned worker isn't reclaimed before its PID
# is visible on /proc.
started_at = row["started_at"] if "started_at" in row.keys() else None
if started_at is not None:
grace = _resolve_crash_grace_seconds()
if time.time() - started_at < grace:
continue
if _pid_alive(row["worker_pid"]):
continue

View file

@ -35,6 +35,9 @@ def kanban_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
# Existing crash-detection tests pre-date the grace window; pin to 0
# so they keep their immediate-reclaim semantics.
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "0")
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Disable the detect_crashed_workers grace period for legacy tests in
# this file that claim a task and immediately expect

View file

@ -564,6 +564,80 @@ def test_detect_crashed_workers_isolated_failure_normal_retry(
)
def test_detect_crashed_workers_skips_freshly_claimed_tasks(
kanban_home, monkeypatch,
):
"""Grace period prevents reclaim of freshly-started tasks."""
import hermes_cli.kanban_db as _kb
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
monkeypatch.delenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", raising=False)
now = 1_000_000.0
monkeypatch.setattr(_kb.time, "time", lambda: now)
with kb.connect() as conn:
host = _kb._claimer_id().split(":", 1)[0]
tid = kb.create_task(conn, title="grace test", assignee="a")
conn.execute(
"UPDATE tasks SET status='running', worker_pid=?, "
"claim_lock=?, started_at=? WHERE id=?",
(99999, f"{host}:w", int(now), tid),
)
conn.commit()
# With time = now (just claimed), grace period should suppress reclaim.
crashed = kb.detect_crashed_workers(conn)
assert tid not in crashed, "should not reclaim freshly-started task"
# With time = now + 60 (past default 30s grace), should reclaim.
monkeypatch.setattr(_kb.time, "time", lambda: now + 60)
crashed = kb.detect_crashed_workers(conn)
assert tid in crashed, "should reclaim task past grace period"
def test_detect_crashed_workers_grace_period_env_override(
kanban_home, monkeypatch,
):
"""HERMES_KANBAN_CRASH_GRACE_SECONDS env var adjusts the window."""
import hermes_cli.kanban_db as _kb
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "5")
now = 2_000_000.0
with kb.connect() as conn:
host = _kb._claimer_id().split(":", 1)[0]
tid = kb.create_task(conn, title="env override test", assignee="a")
conn.execute(
"UPDATE tasks SET status='running', worker_pid=?, "
"claim_lock=?, started_at=? WHERE id=?",
(99999, f"{host}:w", int(now), tid),
)
conn.commit()
# 3s after claim: within 5s grace → no reclaim.
monkeypatch.setattr(_kb.time, "time", lambda: now + 3)
assert tid not in kb.detect_crashed_workers(conn)
# 6s after claim: past 5s grace → reclaim.
monkeypatch.setattr(_kb.time, "time", lambda: now + 6)
assert tid in kb.detect_crashed_workers(conn)
def test_resolve_crash_grace_seconds_handles_bad_env(monkeypatch):
"""Bad env values fall back to DEFAULT_CRASH_GRACE_SECONDS."""
import hermes_cli.kanban_db as _kb
for bad_val in ("notanumber", "-5", ""):
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", bad_val)
result = _kb._resolve_crash_grace_seconds()
assert result == _kb.DEFAULT_CRASH_GRACE_SECONDS, (
f"expected default for {bad_val!r}, got {result}"
)
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
"""A retry should get a fresh max-runtime window.