From de9bcfc6a0500686dbf0cf907251d624e32bba4d Mon Sep 17 00:00:00 2001 From: bradhallett <53977268+bradhallett@users.noreply.github.com> Date: Mon, 18 May 2026 20:16:43 -0700 Subject: [PATCH] fix(kanban): fingerprint crash errors to prevent fleet-wide retry exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a systemic failure (provider outage, auth expiry, OOM) crashes multiple workers simultaneously, detect_crashed_workers increments each task failure counter independently. The circuit breaker only trips after N × failure_limit retries across the fleet. Fingerprint crash errors by normalizing host-specific details (PIDs, timestamps). When 3+ tasks crash with the same fingerprint in a single detection cycle, immediately trip the circuit breaker (failure_limit=1) instead of waiting for repeated failures. Isolated crashes (unique fingerprints) retain their normal retry budget. Protocol violations continue to trip immediately. Includes regression tests for systemic and isolated crash paths. --- hermes_cli/kanban_db.py | 35 +++++++++++------ tests/hermes_cli/test_kanban_db.py | 62 ++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 05af0b231c7..806c081246e 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -3684,18 +3684,29 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: # human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT`` # times first. auto_blocked: list[str] = [] - for tid, pid, claimer, protocol_violation, error_text in crash_details: - tripped = _record_task_failure( - conn, tid, - error=error_text, - outcome="crashed", - failure_limit=(1 if protocol_violation else None), - release_claim=False, - end_run=False, - event_payload_extra={"pid": pid, "claimer": claimer}, - ) - if tripped: - auto_blocked.append(tid) + if crash_details: + # Fingerprint errors to detect systemic failures. + _fp_counts: dict[str, int] = {} + for _, _, _, _, err_text in crash_details: + fp = _error_fingerprint(err_text) + _fp_counts[fp] = _fp_counts.get(fp, 0) + 1 + for tid, pid, claimer, protocol_violation, error_text in crash_details: + fp = _error_fingerprint(error_text) + is_systemic = ( + not protocol_violation + and _fp_counts.get(fp, 0) >= 3 + ) + tripped = _record_task_failure( + conn, tid, + error=error_text, + outcome="crashed", + failure_limit=1 if (protocol_violation or is_systemic) else None, + release_claim=False, + end_run=False, + event_payload_extra={"pid": pid, "claimer": claimer}, + ) + if tripped: + auto_blocked.append(tid) # Stash auto-blocked ids on the function for the dispatch loop to pick up. # Keeps the public return type (``list[str]``) stable for direct callers # and tests that destructure the result; ``dispatch_once`` reads this diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index b7ab8b06f39..f268bc6705f 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -305,6 +305,68 @@ def test_stale_claim_reclaim_event_records_diagnostic_payload( assert payload["host_local"] is True +def test_detect_crashed_workers_systemic_failure_fast_block( + kanban_home, monkeypatch, +): + """When many tasks crash with the same error, trip the breaker faster.""" + import hermes_cli.kanban_db as _kb + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + + with kb.connect() as conn: + task_ids = [] + for i in range(4): + tid = kb.create_task(conn, title=f"task-{i}", assignee="a") + host = _kb._claimer_id().split(":", 1)[0] + conn.execute( + "UPDATE tasks SET status='running', worker_pid=?, " + "claim_lock=? WHERE id=?", + (90000 + i, f"{host}:w{i}", tid), + ) + task_ids.append(tid) + conn.commit() + + crashed = kb.detect_crashed_workers(conn) + assert len(crashed) == 4 + + for tid in task_ids: + task = kb.get_task(conn, tid) + assert task.status == "blocked", ( + f"task {tid} should be blocked (systemic), got {task.status}" + ) + + +def test_detect_crashed_workers_isolated_failure_normal_retry( + kanban_home, monkeypatch, +): + """Below the systemic threshold, tasks retain normal retry budget.""" + import hermes_cli.kanban_db as _kb + + monkeypatch.setattr(_kb, "_pid_alive", lambda _pid: False) + + with kb.connect() as conn: + task_ids = [] + for i in range(2): + tid = kb.create_task(conn, title=f"iso-{i}", assignee="a") + host = _kb._claimer_id().split(":", 1)[0] + conn.execute( + "UPDATE tasks SET status='running', worker_pid=?, " + "claim_lock=? WHERE id=?", + (80000 + i, f"{host}:w{i}", tid), + ) + task_ids.append(tid) + conn.commit() + + crashed = kb.detect_crashed_workers(conn) + assert len(crashed) == 2 + + for tid in task_ids: + task = kb.get_task(conn, tid) + assert task.status == "ready", ( + f"task {tid} should stay ready (isolated), got {task.status}" + ) + + def test_max_runtime_uses_current_run_start_after_retry(kanban_home): """A retry should get a fresh max-runtime window.