fix(kanban): fingerprint crash errors to prevent fleet-wide retry exhaustion

When a systemic failure (provider outage, auth expiry, OOM) crashes
multiple workers simultaneously, detect_crashed_workers increments
each task failure counter independently. The circuit breaker only
trips after N × failure_limit retries across the fleet.

Fingerprint crash errors by normalizing host-specific details (PIDs,
timestamps). When 3+ tasks crash with the same fingerprint in a
single detection cycle, immediately trip the circuit breaker
(failure_limit=1) instead of waiting for repeated failures.

Isolated crashes (unique fingerprints) retain their normal retry
budget. Protocol violations continue to trip immediately.

Includes regression tests for systemic and isolated crash paths.
This commit is contained in:
bradhallett 2026-05-18 20:16:43 -07:00 committed by Teknium
parent f042931852
commit de9bcfc6a0
2 changed files with 85 additions and 12 deletions

View file

@ -3684,18 +3684,29 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
# human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT``
# times first.
auto_blocked: list[str] = []
for tid, pid, claimer, protocol_violation, error_text in crash_details:
tripped = _record_task_failure(
conn, tid,
error=error_text,
outcome="crashed",
failure_limit=(1 if protocol_violation else None),
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "claimer": claimer},
)
if tripped:
auto_blocked.append(tid)
if crash_details:
# Fingerprint errors to detect systemic failures.
_fp_counts: dict[str, int] = {}
for _, _, _, _, err_text in crash_details:
fp = _error_fingerprint(err_text)
_fp_counts[fp] = _fp_counts.get(fp, 0) + 1
for tid, pid, claimer, protocol_violation, error_text in crash_details:
fp = _error_fingerprint(error_text)
is_systemic = (
not protocol_violation
and _fp_counts.get(fp, 0) >= 3
)
tripped = _record_task_failure(
conn, tid,
error=error_text,
outcome="crashed",
failure_limit=1 if (protocol_violation or is_systemic) else None,
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "claimer": claimer},
)
if tripped:
auto_blocked.append(tid)
# Stash auto-blocked ids on the function for the dispatch loop to pick up.
# Keeps the public return type (``list[str]``) stable for direct callers
# and tests that destructure the result; ``dispatch_once`` reads this

View file

@ -305,6 +305,68 @@ def test_stale_claim_reclaim_event_records_diagnostic_payload(
assert payload["host_local"] is True
def test_detect_crashed_workers_systemic_failure_fast_block(
kanban_home, monkeypatch,
):
"""When many tasks crash with the same error, trip the breaker faster."""
import hermes_cli.kanban_db as _kb
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
with kb.connect() as conn:
task_ids = []
for i in range(4):
tid = kb.create_task(conn, title=f"task-{i}", assignee="a")
host = _kb._claimer_id().split(":", 1)[0]
conn.execute(
"UPDATE tasks SET status='running', worker_pid=?, "
"claim_lock=? WHERE id=?",
(90000 + i, f"{host}:w{i}", tid),
)
task_ids.append(tid)
conn.commit()
crashed = kb.detect_crashed_workers(conn)
assert len(crashed) == 4
for tid in task_ids:
task = kb.get_task(conn, tid)
assert task.status == "blocked", (
f"task {tid} should be blocked (systemic), got {task.status}"
)
def test_detect_crashed_workers_isolated_failure_normal_retry(
kanban_home, monkeypatch,
):
"""Below the systemic threshold, tasks retain normal retry budget."""
import hermes_cli.kanban_db as _kb
monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False)
with kb.connect() as conn:
task_ids = []
for i in range(2):
tid = kb.create_task(conn, title=f"iso-{i}", assignee="a")
host = _kb._claimer_id().split(":", 1)[0]
conn.execute(
"UPDATE tasks SET status='running', worker_pid=?, "
"claim_lock=? WHERE id=?",
(80000 + i, f"{host}:w{i}", tid),
)
task_ids.append(tid)
conn.commit()
crashed = kb.detect_crashed_workers(conn)
assert len(crashed) == 2
for tid in task_ids:
task = kb.get_task(conn, tid)
assert task.status == "ready", (
f"task {tid} should stay ready (isolated), got {task.status}"
)
def test_max_runtime_uses_current_run_start_after_retry(kanban_home):
"""A retry should get a fresh max-runtime window.