fix(kanban): auto-block workers that exit without completing (#20894) (#21214)

When a kanban worker subprocess exits rc=0 but its task is still in
status='running', the agent almost certainly answered the task
conversationally without calling kanban_complete or kanban_block. The
dispatcher used to classify this as a generic crash and respawn, which
loops forever on small local models (gemma4-e2b q4 etc.) that keep
returning clean but unproductive output.

Dispatcher changes:
- The waitpid reap loop at the top of dispatch_once now records each
  reaped child's raw exit status in a bounded module registry
  (_recent_worker_exits, TTL 600s, size cap 4096).
- _classify_worker_exit distinguishes clean_exit / nonzero_exit /
  signaled / unknown using os.WIFEXITED / WIFSIGNALED.
- detect_crashed_workers consults the classification when a worker
  is found dead. clean_exit → protocol_violation event + immediate
  circuit-breaker trip (failure_limit=1). Everything else keeps the
  existing crashed-event + counter behavior.
- DispatchResult.auto_blocked now includes protocol-violation trips.

Gateway fix (Bug A in #20894):
- gateway.run._notify_active_sessions_of_shutdown snapshots
  self.adapters with list(...) before iterating. adapter.send() can
  hit a fatal-error path that pops the adapter from the dict, which
  was raising 'RuntimeError: dictionary changed size during iteration'
  during shutdown.

Regression tests:
- test_detect_crashed_workers_protocol_violation_auto_blocks verifies
  rc=0 + still-running → status=blocked on first occurrence with
  protocol_violation + gave_up events and NO crashed event.
- test_detect_crashed_workers_nonzero_exit_uses_default_limit verifies
  non-zero exits keep the existing 2-strike behavior.

Closes #20894.
This commit is contained in:
Teknium 2026-05-07 05:24:16 -07:00 committed by GitHub
parent 699c770e5c
commit fdb9e0f6a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 255 additions and 14 deletions

View file

@ -2618,6 +2618,77 @@ class DispatchResult:
"""Task ids whose workers exceeded ``max_runtime_seconds``."""
# Bounded registry of recently-reaped worker child exits, populated by the
# reap loop at the top of ``dispatch_once`` and consulted by
# ``detect_crashed_workers`` to classify a dead-pid task.
#
# Entry: ``pid -> (raw_wait_status, reaped_at_epoch)``. We keep raw status
# so both ``os.WIFEXITED`` / ``os.WEXITSTATUS`` and ``os.WIFSIGNALED`` can
# be consulted. Entries are trimmed by age (and total size cap as a
# belt-and-braces against unbounded growth on exotic platforms).
_RECENT_WORKER_EXIT_TTL_SECONDS = 600
_RECENT_WORKER_EXITS_MAX = 4096
_recent_worker_exits: "dict[int, tuple[int, float]]" = {}
def _record_worker_exit(pid: int, raw_status: int) -> None:
"""Record a reaped child's exit status for later classification.
Called from the reap loop in ``dispatch_once``. Safe to call many
times; duplicate pids overwrite (pids can cycle, latest wins).
"""
if not pid or pid <= 0:
return
now = time.time()
_recent_worker_exits[int(pid)] = (int(raw_status), now)
# Age-based trim: drop entries older than the TTL.
if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX // 2:
cutoff = now - _RECENT_WORKER_EXIT_TTL_SECONDS
for _pid in [p for p, (_s, t) in _recent_worker_exits.items() if t < cutoff]:
_recent_worker_exits.pop(_pid, None)
# Size cap as a final guard.
if len(_recent_worker_exits) > _RECENT_WORKER_EXITS_MAX:
# Drop oldest half.
ordered = sorted(_recent_worker_exits.items(), key=lambda kv: kv[1][1])
for _pid, _ in ordered[: len(ordered) // 2]:
_recent_worker_exits.pop(_pid, None)
def _classify_worker_exit(pid: int) -> "tuple[str, Optional[int]]":
"""Classify a recently-reaped worker by pid.
Returns ``(kind, code)`` where ``kind`` is one of:
* ``"clean_exit"`` ``WIFEXITED`` with ``WEXITSTATUS == 0``. When the
task is still ``running`` in the DB, this is a protocol violation
(worker exited without calling ``kanban_complete`` / ``kanban_block``)
and should be auto-blocked immediately retrying will just loop.
* ``"nonzero_exit"`` ``WIFEXITED`` with non-zero status. Real error.
* ``"signaled"`` ``WIFSIGNALED`` (OOM killer, SIGKILL, etc). Real crash.
* ``"unknown"`` pid was not in the reap registry (either reaped by
something else, or died between reap tick and liveness check). Fall
back to existing crashed-counter behavior.
``code`` is the exit status (for ``clean_exit`` / ``nonzero_exit``) or
the signal number (for ``signaled``), or ``None`` for ``unknown``.
"""
entry = _recent_worker_exits.get(int(pid))
if entry is None:
return ("unknown", None)
raw, _ = entry
try:
if os.WIFEXITED(raw):
code = os.WEXITSTATUS(raw)
if code == 0:
return ("clean_exit", 0)
return ("nonzero_exit", code)
if os.WIFSIGNALED(raw):
return ("signaled", os.WTERMSIG(raw))
except Exception:
pass
return ("unknown", None)
def _pid_alive(pid: Optional[int]) -> bool:
"""Return True if ``pid`` is still running on this host.
@ -2924,12 +2995,22 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
are meaningless here. The host-local check is enough because
``_default_spawn`` always runs the worker on the same host as the
dispatcher (the whole design is single-host).
When the reap registry shows the worker exited cleanly (rc=0) but
the task was still ``running`` in the DB, treat it as a protocol
violation (worker answered conversationally without calling
``kanban_complete`` / ``kanban_block``) and trip the circuit breaker
on the first occurrence retrying a worker whose CLI keeps
returning 0 without a terminal transition just loops forever.
"""
crashed: list[str] = []
# Per-crash details collected inside the main txn, used after it
# closes to run ``_record_task_failure`` (which needs its own
# write_txn so can't nest).
crash_details: list[tuple[str, int, str]] = [] # (task_id, pid, claimer)
# write_txn so can't nest). ``protocol_violation`` flags the
# clean-exit-but-still-running case so we can trip the breaker
# immediately instead of incrementing by 1.
crash_details: list[tuple[str, int, str, bool, str]] = []
# (task_id, pid, claimer, protocol_violation, error_text)
with write_txn(conn):
rows = conn.execute(
"SELECT id, worker_pid, claim_lock FROM tasks "
@ -2943,6 +3024,39 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
continue
if _pid_alive(row["worker_pid"]):
continue
pid = int(row["worker_pid"])
kind, code = _classify_worker_exit(pid)
if kind == "clean_exit":
# Worker subprocess returned 0 but its task is still
# ``running`` in the DB — it exited without calling
# ``kanban_complete`` / ``kanban_block``. Retrying won't
# help.
protocol_violation = True
error_text = (
"worker exited cleanly (rc=0) without calling "
"kanban_complete or kanban_block — protocol violation"
)
event_kind = "protocol_violation"
event_payload = {
"pid": pid,
"claimer": row["claim_lock"],
"exit_code": code,
}
else:
protocol_violation = False
if kind == "nonzero_exit":
error_text = f"pid {pid} exited with code {code}"
elif kind == "signaled":
error_text = f"pid {pid} killed by signal {code}"
else:
error_text = f"pid {pid} not alive"
event_kind = "crashed"
event_payload = {"pid": pid, "claimer": row["claim_lock"]}
if code is not None and kind != "unknown":
event_payload["exit_kind"] = kind
event_payload["exit_code"] = code
cur = conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL "
@ -2953,34 +3067,47 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
run_id = _end_run(
conn, row["id"],
outcome="crashed", status="crashed",
error=f"pid {int(row['worker_pid'])} not alive",
metadata={
"pid": int(row["worker_pid"]),
"claimer": row["claim_lock"],
},
error=error_text,
metadata=dict(event_payload),
)
_append_event(
conn, row["id"], "crashed",
{"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]},
conn, row["id"], event_kind,
event_payload,
run_id=run_id,
)
crashed.append(row["id"])
crash_details.append(
(row["id"], int(row["worker_pid"]), row["claim_lock"])
(row["id"], pid, row["claim_lock"],
protocol_violation, error_text)
)
# Outside the main txn: increment the unified failure counter for
# each crashed task. If the breaker trips, the task transitions
# ready → blocked with a ``gave_up`` event on top of the ``crashed``
# event we already emitted.
for tid, pid, claimer in crash_details:
_record_task_failure(
#
# Protocol-violation crashes force an immediate trip (failure_limit=1)
# because clean-exit-without-transition is deterministic: the next
# respawn will do exactly the same thing. Better to surface to a
# human with a clear reason than to loop ``DEFAULT_FAILURE_LIMIT``
# times first.
auto_blocked: list[str] = []
for tid, pid, claimer, protocol_violation, error_text in crash_details:
tripped = _record_task_failure(
conn, tid,
error=f"pid {pid} not alive",
error=error_text,
outcome="crashed",
failure_limit=(1 if protocol_violation else None),
release_claim=False,
end_run=False,
event_payload_extra={"pid": pid, "claimer": claimer},
)
if tripped:
auto_blocked.append(tid)
# Stash auto-blocked ids on the function for the dispatch loop to pick up.
# Keeps the public return type (``list[str]``) stable for direct callers
# and tests that destructure the result; ``dispatch_once`` reads this
# side-channel attribute to populate ``DispatchResult.auto_blocked``.
detect_crashed_workers._last_auto_blocked = auto_blocked # type: ignore[attr-defined]
return crashed
@ -3242,6 +3369,12 @@ def dispatch_once(
# exit. WNOHANG keeps this non-blocking; ChildProcessError means no
# children to reap. Bounded: at most one tick's worth of completions
# can be in <defunct> at once.
#
# We also record the exit status keyed by pid, so
# ``detect_crashed_workers`` can distinguish a worker that exited
# cleanly without calling ``kanban_complete`` / ``kanban_block``
# (protocol violation — auto-block) from a real crash (OOM killer,
# SIGKILL, non-zero exit — existing counter behavior).
try:
while True:
try:
@ -3250,12 +3383,21 @@ def dispatch_once(
break
if _pid == 0:
break
_record_worker_exit(_pid, _status)
except Exception:
pass
result = DispatchResult()
result.reclaimed = release_stale_claims(conn)
result.crashed = detect_crashed_workers(conn)
# detect_crashed_workers stashes protocol-violation auto-blocks on
# itself so the public list-return stays stable. Pull them into the
# DispatchResult here so telemetry / tests see the trip.
_crash_auto_blocked = getattr(
detect_crashed_workers, "_last_auto_blocked", []
)
if _crash_auto_blocked:
result.auto_blocked.extend(_crash_auto_blocked)
result.timed_out = enforce_max_runtime(conn)
result.promoted = recompute_ready(conn)