mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-31 06:51:29 +00:00
fix(kanban): add grace period to detect_crashed_workers
`detect_crashed_workers` calls `_pid_alive` on every `running` task whose claim is held by this host. The check can transiently return False for a freshly-spawned worker (fork → /proc-visibility lag, or reap-race between SIGCHLD and parent reaping). When a second dispatcher ticks inside that window it reclaims the task and spawns a duplicate worker. Add `DEFAULT_CRASH_GRACE_SECONDS = 30` and an `HERMES_KANBAN_CRASH_GRACE_SECONDS` env-var override. `detect_crashed_workers` skips the liveness check when `time.time() - started_at < grace`. The existing 15-minute claim TTL still reclaims genuinely-crashed workers; grace only suppresses the launch-window false positive. `HERMES_KANBAN_CRASH_GRACE_SECONDS=0` is set on the `kanban_home` fixture in `test_kanban_core_functionality.py` so existing tests that assert immediate reclaim retain pre-fix semantics. Companion to merged PR #23442 (`release_stale_claims`, closes #23025), which addressed the same multi-dispatcher race in the stale-claim path. Related: #20015 (`_pid_alive` false-negative behaviour),
This commit is contained in:
parent
e83252dc46
commit
c002668ff0
3 changed files with 114 additions and 1 deletions
|
|
@ -134,6 +134,34 @@ def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
|
||||||
return DEFAULT_CLAIM_TTL_SECONDS
|
return DEFAULT_CLAIM_TTL_SECONDS
|
||||||
|
|
||||||
|
|
||||||
|
# Grace period after a task transitions to ``running`` during which
|
||||||
|
# ``detect_crashed_workers`` skips the ``_pid_alive`` check. Covers the
|
||||||
|
# fork() → /proc-visibility window where liveness can transiently report
|
||||||
|
# False for a freshly-spawned worker. The 15-minute claim TTL still
|
||||||
|
# catches genuinely-crashed workers; this only suppresses false positives
|
||||||
|
# during the launch window.
|
||||||
|
DEFAULT_CRASH_GRACE_SECONDS = 30
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_crash_grace_seconds() -> int:
|
||||||
|
"""Return the crash-detection grace period in seconds.
|
||||||
|
|
||||||
|
Reads ``HERMES_KANBAN_CRASH_GRACE_SECONDS`` from the environment;
|
||||||
|
falls back to ``DEFAULT_CRASH_GRACE_SECONDS`` when absent, empty,
|
||||||
|
non-integer, or negative. A value of 0 restores immediate-reclaim
|
||||||
|
behaviour (useful for tests).
|
||||||
|
"""
|
||||||
|
raw = os.environ.get("HERMES_KANBAN_CRASH_GRACE_SECONDS", "").strip()
|
||||||
|
if raw:
|
||||||
|
try:
|
||||||
|
parsed = int(raw)
|
||||||
|
except ValueError:
|
||||||
|
parsed = -1
|
||||||
|
if parsed >= 0:
|
||||||
|
return parsed
|
||||||
|
return DEFAULT_CRASH_GRACE_SECONDS
|
||||||
|
|
||||||
|
|
||||||
# Worker-context caps so build_worker_context() stays bounded on
|
# Worker-context caps so build_worker_context() stays bounded on
|
||||||
# pathological boards (retry-heavy tasks, comment storms, giant
|
# pathological boards (retry-heavy tasks, comment storms, giant
|
||||||
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
||||||
|
|
@ -4653,7 +4681,7 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
||||||
# (task_id, pid, claimer, protocol_violation, error_text)
|
# (task_id, pid, claimer, protocol_violation, error_text)
|
||||||
with write_txn(conn):
|
with write_txn(conn):
|
||||||
rows = conn.execute(
|
rows = conn.execute(
|
||||||
"SELECT id, worker_pid, claim_lock FROM tasks "
|
"SELECT id, worker_pid, claim_lock, started_at FROM tasks "
|
||||||
"WHERE status = 'running' AND worker_pid IS NOT NULL"
|
"WHERE status = 'running' AND worker_pid IS NOT NULL"
|
||||||
).fetchall()
|
).fetchall()
|
||||||
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
||||||
|
|
@ -4662,6 +4690,14 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
||||||
lock = row["claim_lock"] or ""
|
lock = row["claim_lock"] or ""
|
||||||
if not lock.startswith(host_prefix):
|
if not lock.startswith(host_prefix):
|
||||||
continue
|
continue
|
||||||
|
# Skip liveness check inside the launch-window grace period
|
||||||
|
# so a freshly-spawned worker isn't reclaimed before its PID
|
||||||
|
# is visible on /proc.
|
||||||
|
started_at = row["started_at"] if "started_at" in row.keys() else None
|
||||||
|
if started_at is not None:
|
||||||
|
grace = _resolve_crash_grace_seconds()
|
||||||
|
if time.time() - started_at < grace:
|
||||||
|
continue
|
||||||
if _pid_alive(row["worker_pid"]):
|
if _pid_alive(row["worker_pid"]):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,9 @@ def kanban_home(tmp_path, monkeypatch):
|
||||||
home = tmp_path / ".hermes"
|
home = tmp_path / ".hermes"
|
||||||
home.mkdir()
|
home.mkdir()
|
||||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||||
|
# Existing crash-detection tests pre-date the grace window; pin to 0
|
||||||
|
# so they keep their immediate-reclaim semantics.
|
||||||
|
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "0")
|
||||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||||
# Disable the detect_crashed_workers grace period for legacy tests in
|
# Disable the detect_crashed_workers grace period for legacy tests in
|
||||||
# this file that claim a task and immediately expect
|
# this file that claim a task and immediately expect
|
||||||
|
|
|
||||||
|
|
@ -564,6 +564,80 @@ def test_detect_crashed_workers_isolated_failure_normal_retry(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_detect_crashed_workers_skips_freshly_claimed_tasks(
|
||||||
|
kanban_home, monkeypatch,
|
||||||
|
):
|
||||||
|
"""Grace period prevents reclaim of freshly-started tasks."""
|
||||||
|
import hermes_cli.kanban_db as _kb
|
||||||
|
|
||||||
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||||
|
monkeypatch.delenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", raising=False)
|
||||||
|
|
||||||
|
now = 1_000_000.0
|
||||||
|
monkeypatch.setattr(_kb.time, "time", lambda: now)
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
host = _kb._claimer_id().split(":", 1)[0]
|
||||||
|
tid = kb.create_task(conn, title="grace test", assignee="a")
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET status='running', worker_pid=?, "
|
||||||
|
"claim_lock=?, started_at=? WHERE id=?",
|
||||||
|
(99999, f"{host}:w", int(now), tid),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
# With time = now (just claimed), grace period should suppress reclaim.
|
||||||
|
crashed = kb.detect_crashed_workers(conn)
|
||||||
|
assert tid not in crashed, "should not reclaim freshly-started task"
|
||||||
|
|
||||||
|
# With time = now + 60 (past default 30s grace), should reclaim.
|
||||||
|
monkeypatch.setattr(_kb.time, "time", lambda: now + 60)
|
||||||
|
crashed = kb.detect_crashed_workers(conn)
|
||||||
|
assert tid in crashed, "should reclaim task past grace period"
|
||||||
|
|
||||||
|
|
||||||
|
def test_detect_crashed_workers_grace_period_env_override(
|
||||||
|
kanban_home, monkeypatch,
|
||||||
|
):
|
||||||
|
"""HERMES_KANBAN_CRASH_GRACE_SECONDS env var adjusts the window."""
|
||||||
|
import hermes_cli.kanban_db as _kb
|
||||||
|
|
||||||
|
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
|
||||||
|
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", "5")
|
||||||
|
|
||||||
|
now = 2_000_000.0
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
host = _kb._claimer_id().split(":", 1)[0]
|
||||||
|
tid = kb.create_task(conn, title="env override test", assignee="a")
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET status='running', worker_pid=?, "
|
||||||
|
"claim_lock=?, started_at=? WHERE id=?",
|
||||||
|
(99999, f"{host}:w", int(now), tid),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
# 3s after claim: within 5s grace → no reclaim.
|
||||||
|
monkeypatch.setattr(_kb.time, "time", lambda: now + 3)
|
||||||
|
assert tid not in kb.detect_crashed_workers(conn)
|
||||||
|
|
||||||
|
# 6s after claim: past 5s grace → reclaim.
|
||||||
|
monkeypatch.setattr(_kb.time, "time", lambda: now + 6)
|
||||||
|
assert tid in kb.detect_crashed_workers(conn)
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_crash_grace_seconds_handles_bad_env(monkeypatch):
|
||||||
|
"""Bad env values fall back to DEFAULT_CRASH_GRACE_SECONDS."""
|
||||||
|
import hermes_cli.kanban_db as _kb
|
||||||
|
|
||||||
|
for bad_val in ("notanumber", "-5", ""):
|
||||||
|
monkeypatch.setenv("HERMES_KANBAN_CRASH_GRACE_SECONDS", bad_val)
|
||||||
|
result = _kb._resolve_crash_grace_seconds()
|
||||||
|
assert result == _kb.DEFAULT_CRASH_GRACE_SECONDS, (
|
||||||
|
f"expected default for {bad_val!r}, got {result}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
|
def test_max_runtime_uses_current_run_start_after_retry(kanban_home, monkeypatch):
|
||||||
"""A retry should get a fresh max-runtime window.
|
"""A retry should get a fresh max-runtime window.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue