From c002668ff0556f55ac4254f4bcf8d2408eb2cac0 Mon Sep 17 00:00:00 2001 From: Stephen Chin Date: Sat, 23 May 2026 19:23:17 -0700 Subject: [PATCH] fix(kanban): add grace period to detect_crashed_workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `detect_crashed_workers` calls `_pid_alive` on every `running` task whose claim is held by this host. The check can transiently return False for a freshly-spawned worker (fork → /proc-visibility lag, or reap-race between SIGCHLD and parent reaping). When a second dispatcher ticks inside that window it reclaims the task and spawns a duplicate worker. Add `DEFAULT_CRASH_GRACE_SECONDS = 30` and an `HERMES_KANBAN_CRASH_GRACE_SECONDS` env-var override. `detect_crashed_workers` skips the liveness check when `time.time() - started_at < grace`. The existing 15-minute claim TTL still reclaims genuinely-crashed workers; grace only suppresses the launch-window false positive. `HERMES_KANBAN_CRASH_GRACE_SECONDS=0` is set on the `kanban_home` fixture in `test_kanban_core_functionality.py` so existing tests that assert immediate reclaim retain pre-fix semantics. Companion to merged PR #23442 (`release_stale_claims`, closes #23025), which addressed the same multi-dispatcher race in the stale-claim path. Related: #20015 (`_pid_alive` false-negative behaviour), --- hermes_cli/kanban_db.py | 38 +++++++++- .../test_kanban_core_functionality.py | 3 + tests/hermes_cli/test_kanban_db.py | 74 +++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 227a943ea65..4321c9ce417 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -134,6 +134,34 @@ def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int: return DEFAULT_CLAIM_TTL_SECONDS +# Grace period after a task transitions to ``running`` during which +# ``detect_crashed_workers`` skips the ``_pid_alive`` check. Covers the +# fork() → /proc-visibility window where liveness can transiently report +# False for a freshly-spawned worker. The 15-minute claim TTL still +# catches genuinely-crashed workers; this only suppresses false positives +# during the launch window. +DEFAULT_CRASH_GRACE_SECONDS = 30 + + +def _resolve_crash_grace_seconds() -> int: + """Return the crash-detection grace period in seconds. + + Reads ``HERMES_KANBAN_CRASH_GRACE_SECONDS`` from the environment; + falls back to ``DEFAULT_CRASH_GRACE_SECONDS`` when absent, empty, + non-integer, or negative. A value of 0 restores immediate-reclaim + behaviour (useful for tests). + """ + raw = os.environ.get("HERMES_KANBAN_CRASH_GRACE_SECONDS", "").strip() + if raw: + try: + parsed = int(raw) + except ValueError: + parsed = -1 + if parsed >= 0: + return parsed + return DEFAULT_CRASH_GRACE_SECONDS + + # Worker-context caps so build_worker_context() stays bounded on # pathological boards (retry-heavy tasks, comment storms, giant # summaries). Values chosen to fit a typical 100k-char LLM prompt with @@ -4653,7 +4681,7 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: # (task_id, pid, claimer, protocol_violation, error_text) with write_txn(conn): rows = conn.execute( - "SELECT id, worker_pid, claim_lock FROM tasks " + "SELECT id, worker_pid, claim_lock, started_at FROM tasks " "WHERE status = 'running' AND worker_pid IS NOT NULL" ).fetchall() host_prefix = f"{_claimer_id().split(':', 1)[0]}:" @@ -4662,6 +4690,14 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: lock = row["claim_lock"] or "" if not lock.startswith(host_prefix): continue + # Skip liveness check inside the launch-window grace period + # so a freshly-spawned worker isn't reclaimed before its PID + # is visible on /proc. + started_at = row["started_at"] if "started_at" in row.keys() else None + if started_at is not None: + grace = _resolve_crash_grace_seconds() + if time.time() - started_at < grace: + continue if _pid_alive(row["worker_pid"]): continue diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 0b8386bafe4..ce8334208af 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -35,6 +35,9 @@ def kanban_home(tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) + # Existing crash-detection tests pre-date the grace window; pin to 0 + # so they keep their immediate-reclaim semantics. + monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "0") monkeypatch.setattr(Path, "home", lambda: tmp_path) # Disable the detect_crashed_workers grace period for legacy tests in # this file that claim a task and immediately expect diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index a34c659fce8..af3302cc6f3 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -564,6 +564,80 @@ def test_detect_crashed_workers_isolated_failure_normal_retry( ) +def test_detect_crashed_workers_skips_freshly_claimed_tasks( + kanban_home, monkeypatch, +): + """Grace period prevents reclaim of freshly-started tasks.""" + import hermes_cli.kanban_db as _kb + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + monkeypatch.delenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", raising=False) + + now = 1_000_000.0 + monkeypatch.setattr(_kb.time, "time", lambda: now) + + with kb.connect() as conn: + host = _kb._claimer_id().split(":", 1)[0] + tid = kb.create_task(conn, title="grace test", assignee="a") + conn.execute( + "UPDATE tasks SET status='running', worker_pid=?, " + "claim_lock=?, started_at=? WHERE id=?", + (99999, f"{host}:w", int(now), tid), + ) + conn.commit() + + # With time = now (just claimed), grace period should suppress reclaim. + crashed = kb.detect_crashed_workers(conn) + assert tid not in crashed, "should not reclaim freshly-started task" + + # With time = now + 60 (past default 30s grace), should reclaim. + monkeypatch.setattr(_kb.time, "time", lambda: now + 60) + crashed = kb.detect_crashed_workers(conn) + assert tid in crashed, "should reclaim task past grace period" + + +def test_detect_crashed_workers_grace_period_env_override( + kanban_home, monkeypatch, +): + """HERMES_KANBAN_CRASH_GRACE_SECONDS env var adjusts the window.""" + import hermes_cli.kanban_db as _kb + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "5") + + now = 2_000_000.0 + + with kb.connect() as conn: + host = _kb._claimer_id().split(":", 1)[0] + tid = kb.create_task(conn, title="env override test", assignee="a") + conn.execute( + "UPDATE tasks SET status='running', worker_pid=?, " + "claim_lock=?, started_at=? WHERE id=?", + (99999, f"{host}:w", int(now), tid), + ) + conn.commit() + + # 3s after claim: within 5s grace → no reclaim. + monkeypatch.setattr(_kb.time, "time", lambda: now + 3) + assert tid not in kb.detect_crashed_workers(conn) + + # 6s after claim: past 5s grace → reclaim. + monkeypatch.setattr(_kb.time, "time", lambda: now + 6) + assert tid in kb.detect_crashed_workers(conn) + + +def test_resolve_crash_grace_seconds_handles_bad_env(monkeypatch): + """Bad env values fall back to DEFAULT_CRASH_GRACE_SECONDS.""" + import hermes_cli.kanban_db as _kb + + for bad_val in ("notanumber", "-5", ""): + monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", bad_val) + result = _kb._resolve_crash_grace_seconds() + assert result == _kb.DEFAULT_CRASH_GRACE_SECONDS, ( + f"expected default for {bad_val!r}, got {result}" + ) + + def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch): """A retry should get a fresh max-runtime window.