mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-30 06:41:51 +00:00
fix(kanban): add grace period to detect_crashed_workers
`detect_crashed_workers` calls `_pid_alive` on every `running` task whose claim is held by this host. The check can transiently return False for a freshly-spawned worker (fork → /proc-visibility lag, or reap-race between SIGCHLD and parent reaping). When a second dispatcher ticks inside that window it reclaims the task and spawns a duplicate worker. Add `DEFAULT_CRASH_GRACE_SECONDS = 30` and an `HERMES_KANBAN_CRASH_GRACE_SECONDS` env-var override. `detect_crashed_workers` skips the liveness check when `time.time() - started_at < grace`. The existing 15-minute claim TTL still reclaims genuinely-crashed workers; grace only suppresses the launch-window false positive. `HERMES_KANBAN_CRASH_GRACE_SECONDS=0` is set on the `kanban_home` fixture in `test_kanban_core_functionality.py` so existing tests that assert immediate reclaim retain pre-fix semantics. Companion to merged PR #23442 (`release_stale_claims`, closes #23025), which addressed the same multi-dispatcher race in the stale-claim path. Related: #20015 (`_pid_alive` false-negative behaviour),
This commit is contained in:
parent
e83252dc46
commit
c002668ff0
3 changed files with 114 additions and 1 deletions
|
|
@ -134,6 +134,34 @@ def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
|
|||
return DEFAULT_CLAIM_TTL_SECONDS
|
||||
|
||||
|
||||
# Grace period after a task transitions to ``running`` during which
|
||||
# ``detect_crashed_workers`` skips the ``_pid_alive`` check. Covers the
|
||||
# fork() → /proc-visibility window where liveness can transiently report
|
||||
# False for a freshly-spawned worker. The 15-minute claim TTL still
|
||||
# catches genuinely-crashed workers; this only suppresses false positives
|
||||
# during the launch window.
|
||||
DEFAULT_CRASH_GRACE_SECONDS = 30
|
||||
|
||||
|
||||
def _resolve_crash_grace_seconds() -> int:
|
||||
"""Return the crash-detection grace period in seconds.
|
||||
|
||||
Reads ``HERMES_KANBAN_CRASH_GRACE_SECONDS`` from the environment;
|
||||
falls back to ``DEFAULT_CRASH_GRACE_SECONDS`` when absent, empty,
|
||||
non-integer, or negative. A value of 0 restores immediate-reclaim
|
||||
behaviour (useful for tests).
|
||||
"""
|
||||
raw = os.environ.get("HERMES_KANBAN_CRASH_GRACE_SECONDS", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = int(raw)
|
||||
except ValueError:
|
||||
parsed = -1
|
||||
if parsed >= 0:
|
||||
return parsed
|
||||
return DEFAULT_CRASH_GRACE_SECONDS
|
||||
|
||||
|
||||
# Worker-context caps so build_worker_context() stays bounded on
|
||||
# pathological boards (retry-heavy tasks, comment storms, giant
|
||||
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
||||
|
|
@ -4653,7 +4681,7 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
# (task_id, pid, claimer, protocol_violation, error_text)
|
||||
with write_txn(conn):
|
||||
rows = conn.execute(
|
||||
"SELECT id, worker_pid, claim_lock FROM tasks "
|
||||
"SELECT id, worker_pid, claim_lock, started_at FROM tasks "
|
||||
"WHERE status = 'running' AND worker_pid IS NOT NULL"
|
||||
).fetchall()
|
||||
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
||||
|
|
@ -4662,6 +4690,14 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
|||
lock = row["claim_lock"] or ""
|
||||
if not lock.startswith(host_prefix):
|
||||
continue
|
||||
# Skip liveness check inside the launch-window grace period
|
||||
# so a freshly-spawned worker isn't reclaimed before its PID
|
||||
# is visible on /proc.
|
||||
started_at = row["started_at"] if "started_at" in row.keys() else None
|
||||
if started_at is not None:
|
||||
grace = _resolve_crash_grace_seconds()
|
||||
if time.time() - started_at < grace:
|
||||
continue
|
||||
if _pid_alive(row["worker_pid"]):
|
||||
continue
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,9 @@ def kanban_home(tmp_path, monkeypatch):
|
|||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
# Existing crash-detection tests pre-date the grace window; pin to 0
|
||||
# so they keep their immediate-reclaim semantics.
|
||||
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "0")
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
# Disable the detect_crashed_workers grace period for legacy tests in
|
||||
# this file that claim a task and immediately expect
|
||||
|
|
|
|||
|
|
@ -564,6 +564,80 @@ def test_detect_crashed_workers_isolated_failure_normal_retry(
|
|||
)
|
||||
|
||||
|
||||
def test_detect_crashed_workers_skips_freshly_claimed_tasks(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""Grace period prevents reclaim of freshly-started tasks."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
monkeypatch.delenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", raising=False)
|
||||
|
||||
now = 1_000_000.0
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now)
|
||||
|
||||
with kb.connect() as conn:
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
tid = kb.create_task(conn, title="grace test", assignee="a")
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='running', worker_pid=?, "
|
||||
"claim_lock=?, started_at=? WHERE id=?",
|
||||
(99999, f"{host}:w", int(now), tid),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# With time = now (just claimed), grace period should suppress reclaim.
|
||||
crashed = kb.detect_crashed_workers(conn)
|
||||
assert tid not in crashed, "should not reclaim freshly-started task"
|
||||
|
||||
# With time = now + 60 (past default 30s grace), should reclaim.
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 60)
|
||||
crashed = kb.detect_crashed_workers(conn)
|
||||
assert tid in crashed, "should reclaim task past grace period"
|
||||
|
||||
|
||||
def test_detect_crashed_workers_grace_period_env_override(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""HERMES_KANBAN_CRASH_GRACE_SECONDS env var adjusts the window."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "5")
|
||||
|
||||
now = 2_000_000.0
|
||||
|
||||
with kb.connect() as conn:
|
||||
host = _kb._claimer_id().split(":", 1)[0]
|
||||
tid = kb.create_task(conn, title="env override test", assignee="a")
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status='running', worker_pid=?, "
|
||||
"claim_lock=?, started_at=? WHERE id=?",
|
||||
(99999, f"{host}:w", int(now), tid),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# 3s after claim: within 5s grace → no reclaim.
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 3)
|
||||
assert tid not in kb.detect_crashed_workers(conn)
|
||||
|
||||
# 6s after claim: past 5s grace → reclaim.
|
||||
monkeypatch.setattr(_kb.time, "time", lambda: now + 6)
|
||||
assert tid in kb.detect_crashed_workers(conn)
|
||||
|
||||
|
||||
def test_resolve_crash_grace_seconds_handles_bad_env(monkeypatch):
|
||||
"""Bad env values fall back to DEFAULT_CRASH_GRACE_SECONDS."""
|
||||
import hermes_cli.kanban_db as _kb
|
||||
|
||||
for bad_val in ("notanumber", "-5", ""):
|
||||
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", bad_val)
|
||||
result = _kb._resolve_crash_grace_seconds()
|
||||
assert result == _kb.DEFAULT_CRASH_GRACE_SECONDS, (
|
||||
f"expected default for {bad_val!r}, got {result}"
|
||||
)
|
||||
|
||||
|
||||
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
|
||||
"""A retry should get a fresh max-runtime window.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue