diff --git a/gateway/run.py b/gateway/run.py index 7fda24614b..303e030177 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2521,7 +2521,12 @@ class GatewayRunner: platform_str, chat_id, e, ) - for platform, adapter in self.adapters.items(): + # Snapshot adapters up front: adapter.send() can hit a fatal error + # path that pops the adapter from self.adapters (see _handle_fatal + # elsewhere), which would otherwise trigger + # ``RuntimeError: dictionary changed size during iteration`` — + # observed in a user report during gateway shutdown. + for platform, adapter in list(self.adapters.items()): home = self.config.get_home_channel(platform) if not home or not home.chat_id: continue diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 94968dd87c..1c97d6beec 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -2618,6 +2618,77 @@ class DispatchResult: """Task ids whose workers exceeded ``max_runtime_seconds``.""" +# Bounded registry of recently-reaped worker child exits, populated by the +# reap loop at the top of ``dispatch_once`` and consulted by +# ``detect_crashed_workers`` to classify a dead-pid task. +# +# Entry: ``pid -> (raw_wait_status, reaped_at_epoch)``. We keep raw status +# so both ``os.WIFEXITED`` / ``os.WEXITSTATUS`` and ``os.WIFSIGNALED`` can +# be consulted. Entries are trimmed by age (and total size cap as a +# belt-and-braces against unbounded growth on exotic platforms). +_RECENT_WORKER_EXIT_TTL_SECONDS = 600 +_RECENT_WORKER_EXITS_MAX = 4096 +_recent_worker_exits: "dict[int, tuple[int, float]]" = {} + + +def _record_worker_exit(pid: int, raw_status: int) -> None: + """Record a reaped child's exit status for later classification. + + Called from the reap loop in ``dispatch_once``. Safe to call many + times; duplicate pids overwrite (pids can cycle, latest wins). + """ + if not pid or pid <= 0: + return + now = time.time() + _recent_worker_exits[int(pid)] = (int(raw_status), now) + # Age-based trim: drop entries older than the TTL. + if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX // 2: + cutoff = now - _RECENT_WORKER_EXIT_TTL_SECONDS + for _pid in [p for p, (_s, t) in _recent_worker_exits.items() if t < cutoff]: + _recent_worker_exits.pop(_pid, None) + # Size cap as a final guard. + if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX: + # Drop oldest half. + ordered = sorted(_recent_worker_exits.items(), key=lambda kv: kv[1][1]) + for _pid, _ in ordered[: len(ordered) // 2]: + _recent_worker_exits.pop(_pid, None) + + +def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]": + """Classify a recently-reaped worker by pid. + + Returns ``(kind, code)`` where ``kind`` is one of: + + * ``"clean_exit"`` — ``WIFEXITED`` with ``WEXITSTATUS == 0``. When the + task is still ``running`` in the DB, this is a protocol violation + (worker exited without calling ``kanban_complete`` / ``kanban_block``) + and should be auto-blocked immediately — retrying will just loop. + * ``"nonzero_exit"`` — ``WIFEXITED`` with non-zero status. Real error. + * ``"signaled"`` — ``WIFSIGNALED`` (OOM killer, SIGKILL, etc). Real crash. + * ``"unknown"`` — pid was not in the reap registry (either reaped by + something else, or died between reap tick and liveness check). Fall + back to existing crashed-counter behavior. + + ``code`` is the exit status (for ``clean_exit`` / ``nonzero_exit``) or + the signal number (for ``signaled``), or ``None`` for ``unknown``. + """ + entry = _recent_worker_exits.get(int(pid)) + if entry is None: + return ("unknown", None) + raw, _ = entry + try: + if os.WIFEXITED(raw): + code = os.WEXITSTATUS(raw) + if code == 0: + return ("clean_exit", 0) + return ("nonzero_exit", code) + if os.WIFSIGNALED(raw): + return ("signaled", os.WTERMSIG(raw)) + except Exception: + pass + return ("unknown", None) + + def _pid_alive(pid: Optional[int]) -> bool: """Return True if ``pid`` is still running on this host. @@ -2924,12 +2995,22 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: are meaningless here. The host-local check is enough because ``_default_spawn`` always runs the worker on the same host as the dispatcher (the whole design is single-host). + + When the reap registry shows the worker exited cleanly (rc=0) but + the task was still ``running`` in the DB, treat it as a protocol + violation (worker answered conversationally without calling + ``kanban_complete`` / ``kanban_block``) and trip the circuit breaker + on the first occurrence — retrying a worker whose CLI keeps + returning 0 without a terminal transition just loops forever. """ crashed: list[str] = [] # Per-crash details collected inside the main txn, used after it # closes to run ``_record_task_failure`` (which needs its own - # write_txn so can't nest). - crash_details: list[tuple[str, int, str]] = [] # (task_id, pid, claimer) + # write_txn so can't nest). ``protocol_violation`` flags the + # clean-exit-but-still-running case so we can trip the breaker + # immediately instead of incrementing by 1. + crash_details: list[tuple[str, int, str, bool, str]] = [] + # (task_id, pid, claimer, protocol_violation, error_text) with write_txn(conn): rows = conn.execute( "SELECT id, worker_pid, claim_lock FROM tasks " @@ -2943,6 +3024,39 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: continue if _pid_alive(row["worker_pid"]): continue + + pid = int(row["worker_pid"]) + kind, code = _classify_worker_exit(pid) + if kind == "clean_exit": + # Worker subprocess returned 0 but its task is still + # ``running`` in the DB — it exited without calling + # ``kanban_complete`` / ``kanban_block``. Retrying won't + # help. + protocol_violation = True + error_text = ( + "worker exited cleanly (rc=0) without calling " + "kanban_complete or kanban_block — protocol violation" + ) + event_kind = "protocol_violation" + event_payload = { + "pid": pid, + "claimer": row["claim_lock"], + "exit_code": code, + } + else: + protocol_violation = False + if kind == "nonzero_exit": + error_text = f"pid {pid} exited with code {code}" + elif kind == "signaled": + error_text = f"pid {pid} killed by signal {code}" + else: + error_text = f"pid {pid} not alive" + event_kind = "crashed" + event_payload = {"pid": pid, "claimer": row["claim_lock"]} + if code is not None and kind != "unknown": + event_payload["exit_kind"] = kind + event_payload["exit_code"] = code + cur = conn.execute( "UPDATE tasks SET status = 'ready', claim_lock = NULL, " "claim_expires = NULL, worker_pid = NULL " @@ -2953,34 +3067,47 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: run_id = _end_run( conn, row["id"], outcome="crashed", status="crashed", - error=f"pid {int(row['worker_pid'])} not alive", - metadata={ - "pid": int(row["worker_pid"]), - "claimer": row["claim_lock"], - }, + error=error_text, + metadata=dict(event_payload), ) _append_event( - conn, row["id"], "crashed", - {"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]}, + conn, row["id"], event_kind, + event_payload, run_id=run_id, ) crashed.append(row["id"]) crash_details.append( - (row["id"], int(row["worker_pid"]), row["claim_lock"]) + (row["id"], pid, row["claim_lock"], + protocol_violation, error_text) ) # Outside the main txn: increment the unified failure counter for # each crashed task. If the breaker trips, the task transitions # ready → blocked with a ``gave_up`` event on top of the ``crashed`` # event we already emitted. - for tid, pid, claimer in crash_details: - _record_task_failure( + # + # Protocol-violation crashes force an immediate trip (failure_limit=1) + # because clean-exit-without-transition is deterministic: the next + # respawn will do exactly the same thing. Better to surface to a + # human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT`` + # times first. + auto_blocked: list[str] = [] + for tid, pid, claimer, protocol_violation, error_text in crash_details: + tripped = _record_task_failure( conn, tid, - error=f"pid {pid} not alive", + error=error_text, outcome="crashed", + failure_limit=(1 if protocol_violation else None), release_claim=False, end_run=False, event_payload_extra={"pid": pid, "claimer": claimer}, ) + if tripped: + auto_blocked.append(tid) + # Stash auto-blocked ids on the function for the dispatch loop to pick up. + # Keeps the public return type (``list[str]``) stable for direct callers + # and tests that destructure the result; ``dispatch_once`` reads this + # side-channel attribute to populate ``DispatchResult.auto_blocked``. + detect_crashed_workers._last_auto_blocked = auto_blocked # type: ignore[attr-defined] return crashed @@ -3242,6 +3369,12 @@ def dispatch_once( # exit. WNOHANG keeps this non-blocking; ChildProcessError means no # children to reap. Bounded: at most one tick's worth of completions # can be in at once. + # + # We also record the exit status keyed by pid, so + # ``detect_crashed_workers`` can distinguish a worker that exited + # cleanly without calling ``kanban_complete`` / ``kanban_block`` + # (protocol violation — auto-block) from a real crash (OOM killer, + # SIGKILL, non-zero exit — existing counter behavior). try: while True: try: @@ -3250,12 +3383,21 @@ def dispatch_once( break if _pid == 0: break + _record_worker_exit(_pid, _status) except Exception: pass result = DispatchResult() result.reclaimed = release_stale_claims(conn) result.crashed = detect_crashed_workers(conn) + # detect_crashed_workers stashes protocol-violation auto-blocks on + # itself so the public list-return stays stable. Pull them into the + # DispatchResult here so telemetry / tests see the trip. + _crash_auto_blocked = getattr( + detect_crashed_workers, "_last_auto_blocked", [] + ) + if _crash_auto_blocked: + result.auto_blocked.extend(_crash_auto_blocked) result.timed_out = enforce_max_runtime(conn) result.promoted = recompute_ready(conn) diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index a6d65f6072..306112c64a 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -3636,6 +3636,100 @@ def test_detect_crashed_workers_increments_counter(kanban_home): conn.close() +def test_detect_crashed_workers_protocol_violation_auto_blocks(kanban_home): + """A worker that exited rc=0 while its task was still ``running`` + is a protocol violation (agent answered conversationally without + calling kanban_complete / kanban_block). Retrying will just loop, + so auto-block immediately instead of waiting for the breaker to + trip at ``DEFAULT_FAILURE_LIMIT``. + + Regression test for the respawn-loop-after-completion bug reported + against small local models (gemma4-e2b q4) where the model writes + the answer as plain text and the CLI exits rc=0 cleanly. + """ + import hermes_cli.kanban_db as _kb + conn = kb.connect() + try: + tid = kb.create_task(conn, title="quiet", assignee="worker") + host_prefix = _kb._claimer_id().split(":", 1)[0] + lock = f"{host_prefix}:mock" + kb.claim_task(conn, tid, claimer=lock) + fake_pid = 999998 + kb._set_worker_pid(conn, tid, fake_pid) + + # Simulate the reap loop having recorded a clean exit for this pid. + # os.W_EXITCODE(status=0, signal=0) == 0 on POSIX. + _kb._record_worker_exit(fake_pid, 0) + # Force liveness check to say "dead" for the fake pid. + original_alive = _kb._pid_alive + _kb._pid_alive = lambda p: False + try: + result_crashed = kb.detect_crashed_workers(conn) + finally: + _kb._pid_alive = original_alive + + assert tid in result_crashed, "should be detected as crashed" + task = kb.get_task(conn, tid) + assert task.status == "blocked", ( + f"protocol violation should auto-block on first occurrence, " + f"got status={task.status}" + ) + assert "kanban_complete" in (task.last_failure_error or ""), ( + f"expected protocol-violation message, got {task.last_failure_error!r}" + ) + + events = kb.list_events(conn, tid) + kinds = [e.kind for e in events] + assert "protocol_violation" in kinds, ( + f"expected 'protocol_violation' event, got {kinds}" + ) + # The ``crashed`` event would be misleading here — the worker + # didn't crash, it returned 0. + assert "crashed" not in kinds, ( + f"should NOT emit 'crashed' event on clean exit, got {kinds}" + ) + assert "gave_up" in kinds, ( + f"breaker should trip, expected 'gave_up' event, got {kinds}" + ) + finally: + conn.close() + + +def test_detect_crashed_workers_nonzero_exit_uses_default_limit(kanban_home): + """A worker that exited non-zero (real error / crash) uses the + normal counter path — one failure doesn't trip the breaker. + """ + import hermes_cli.kanban_db as _kb + conn = kb.connect() + try: + tid = kb.create_task(conn, title="crashy", assignee="worker") + host_prefix = _kb._claimer_id().split(":", 1)[0] + kb.claim_task(conn, tid, claimer=f"{host_prefix}:mock") + fake_pid = 999997 + kb._set_worker_pid(conn, tid, fake_pid) + + # W_EXITCODE(1, 0) == 256 — WIFEXITED True, WEXITSTATUS == 1. + _kb._record_worker_exit(fake_pid, 256) + original_alive = _kb._pid_alive + _kb._pid_alive = lambda p: False + try: + kb.detect_crashed_workers(conn) + finally: + _kb._pid_alive = original_alive + + task = kb.get_task(conn, tid) + assert task.status == "ready", ( + f"single non-zero crash shouldn't auto-block, got {task.status}" + ) + assert task.consecutive_failures == 1 + events = kb.list_events(conn, tid) + kinds = [e.kind for e in events] + assert "crashed" in kinds + assert "protocol_violation" not in kinds + finally: + conn.close() + + def test_reclaim_task_clears_failure_counter(kanban_home): """Operator reclaim wipes the counter so the next retry gets a fresh budget."""