fix: auto-block repeated kanban retries

This commit is contained in:
mwnickerson 2026-05-06 15:30:03 -04:00 committed by Teknium
parent 595e906698
commit 411cfa26e3
5 changed files with 119 additions and 20 deletions

View file

@ -3645,6 +3645,24 @@ class GatewayRunner:
if max_spawn is not None: if max_spawn is not None:
logger.info(f"kanban dispatcher: max_spawn={max_spawn}") logger.info(f"kanban dispatcher: max_spawn={max_spawn}")
raw_failure_limit = kanban_cfg.get("failure_limit", _kb.DEFAULT_FAILURE_LIMIT)
try:
failure_limit = int(raw_failure_limit)
except (TypeError, ValueError):
logger.warning(
"kanban dispatcher: invalid kanban.failure_limit=%r; using default %d",
raw_failure_limit,
_kb.DEFAULT_FAILURE_LIMIT,
)
failure_limit = _kb.DEFAULT_FAILURE_LIMIT
if failure_limit < 1:
logger.warning(
"kanban dispatcher: kanban.failure_limit=%r is below 1; using default %d",
raw_failure_limit,
_kb.DEFAULT_FAILURE_LIMIT,
)
failure_limit = _kb.DEFAULT_FAILURE_LIMIT
# Initial delay so the gateway finishes wiring adapters before the # Initial delay so the gateway finishes wiring adapters before the
# dispatcher spawns workers (those workers may hit gateway notify # dispatcher spawns workers (those workers may hit gateway notify
# subscriptions etc.). Matches the notifier watcher's delay. # subscriptions etc.). Matches the notifier watcher's delay.
@ -3673,7 +3691,12 @@ class GatewayRunner:
_kb.init_db(board=slug) # idempotent, handles first-run _kb.init_db(board=slug) # idempotent, handles first-run
except Exception: except Exception:
pass pass
return _kb.dispatch_once(conn, board=slug, max_spawn=max_spawn) return _kb.dispatch_once(
conn,
board=slug,
max_spawn=max_spawn,
failure_limit=failure_limit,
)
except Exception: except Exception:
logger.exception("kanban dispatcher: tick failed on board %s", slug) logger.exception("kanban dispatcher: tick failed on board %s", slug)
return None return None

View file

@ -1230,6 +1230,10 @@ DEFAULT_CONFIG = {
# Seconds between dispatcher ticks (idle or not). Lower = snappier # Seconds between dispatcher ticks (idle or not). Lower = snappier
# pickup of newly-ready tasks; higher = less SQL pressure. # pickup of newly-ready tasks; higher = less SQL pressure.
"dispatch_interval_seconds": 60, "dispatch_interval_seconds": 60,
# Auto-block after this many consecutive non-success attempts for the
# same task/profile (spawn_failed, timed_out, or crashed). Reassignment
# resets the streak for the new profile.
"failure_limit": 2,
}, },
# execute_code settings — controls the tool used for programmatic tool calls. # execute_code settings — controls the tool used for programmatic tool calls.

View file

@ -443,8 +443,8 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
help="Cap number of spawns this pass") help="Cap number of spawns this pass")
p_disp.add_argument("--failure-limit", type=int, p_disp.add_argument("--failure-limit", type=int,
default=kb.DEFAULT_SPAWN_FAILURE_LIMIT, default=kb.DEFAULT_SPAWN_FAILURE_LIMIT,
help=f"Auto-block a task after this many consecutive spawn failures " help=f"Auto-block a task after this many consecutive non-success attempts "
f"(default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})") f"(spawn_failed, timed_out, or crashed; default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})")
p_disp.add_argument("--json", action="store_true") p_disp.add_argument("--json", action="store_true")
# --- daemon (deprecated) --- # --- daemon (deprecated) ---
@ -1657,6 +1657,7 @@ def _cmd_daemon(args: argparse.Namespace) -> int:
" kanban:\n" " kanban:\n"
" dispatch_in_gateway: true # default\n" " dispatch_in_gateway: true # default\n"
" dispatch_interval_seconds: 60\n" " dispatch_interval_seconds: 60\n"
" failure_limit: 2 # consecutive non-success attempts before auto-block\n"
"\n" "\n"
"Running both the gateway AND this standalone daemon will\n" "Running both the gateway AND this standalone daemon will\n"
"race for claims. If you truly need the old standalone\n" "race for claims. If you truly need the old standalone\n"

View file

@ -1380,7 +1380,7 @@ def assign_task(conn: sqlite3.Connection, task_id: str, profile: Optional[str])
profile = _canonical_assignee(profile) profile = _canonical_assignee(profile)
with write_txn(conn): with write_txn(conn):
row = conn.execute( row = conn.execute(
"SELECT status, claim_lock FROM tasks WHERE id = ?", (task_id,) "SELECT status, claim_lock, assignee FROM tasks WHERE id = ?", (task_id,)
).fetchone() ).fetchone()
if not row: if not row:
return False return False
@ -1389,7 +1389,17 @@ def assign_task(conn: sqlite3.Connection, task_id: str, profile: Optional[str])
f"cannot reassign {task_id}: currently running (claimed). " f"cannot reassign {task_id}: currently running (claimed). "
"Wait for completion or reclaim the stale lock first." "Wait for completion or reclaim the stale lock first."
) )
conn.execute("UPDATE tasks SET assignee = ? WHERE id = ?", (profile, task_id)) if row["assignee"] != profile:
# The retry guard is scoped to the task/profile combination. A
# human reassigning the task is an explicit recovery action, so the
# new profile should not inherit the previous profile's streak.
conn.execute(
"UPDATE tasks SET assignee = ?, consecutive_failures = 0, "
"last_failure_error = NULL WHERE id = ?",
(profile, task_id),
)
else:
conn.execute("UPDATE tasks SET assignee = ? WHERE id = ?", (profile, task_id))
_append_event(conn, task_id, "assigned", {"assignee": profile}) _append_event(conn, task_id, "assigned", {"assignee": profile})
return True return True
@ -2569,11 +2579,11 @@ def set_workspace_path(
# Dispatcher (one-shot pass) # Dispatcher (one-shot pass)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# After this many consecutive `spawn_failed` events on a task, the dispatcher # After this many consecutive non-success attempts on a task/profile, the
# stops retrying and parks the task in ``blocked`` with a reason so a human # dispatcher stops retrying and parks the task in ``blocked`` with a reason so
# can investigate. Prevents the dispatcher from thrashing forever on a task # a human can investigate. Prevents retry storms when a worker repeatedly times
# whose profile doesn't exist, whose workspace is unmountable, etc. # out, crashes, or cannot spawn.
DEFAULT_FAILURE_LIMIT = 5 DEFAULT_FAILURE_LIMIT = 2
# Legacy alias — callers / tests still reference the old name. # Legacy alias — callers / tests still reference the old name.
DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT

View file

@ -90,22 +90,20 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna
conn = kb.connect() conn = kb.connect()
try: try:
tid = kb.create_task(conn, title="x", assignee="worker") tid = kb.create_task(conn, title="x", assignee="worker")
# Three ticks below the default limit (5) → still ready, counter grows. assert kb.DEFAULT_FAILURE_LIMIT == 2
for i in range(3): # One default-limit failure → still ready, counter grows.
res = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn)
assert tid not in res.auto_blocked assert tid not in res1.auto_blocked
task = kb.get_task(conn, tid) task = kb.get_task(conn, tid)
assert task.status == "ready" assert task.status == "ready"
assert task.consecutive_failures == 3 assert task.consecutive_failures == 1
# Two more ticks → fifth failure exceeds the limit. # Second default-limit failure trips the guard.
res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn)
assert tid not in res1.auto_blocked
res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5)
assert tid in res2.auto_blocked assert tid in res2.auto_blocked
task = kb.get_task(conn, tid) task = kb.get_task(conn, tid)
assert task.status == "blocked" assert task.status == "blocked"
assert task.consecutive_failures >= 5 assert task.consecutive_failures >= 2
assert task.last_failure_error and "no PATH" in task.last_failure_error assert task.last_failure_error and "no PATH" in task.last_failure_error
finally: finally:
conn.close() conn.close()
@ -170,6 +168,27 @@ def test_successful_completion_resets_failure_counter(kanban_home, all_assignees
conn.close() conn.close()
def test_reassign_resets_failure_counter_for_new_profile(kanban_home, all_assignees_spawnable):
"""Retry streaks are scoped to a task/profile pair; reassigning is a
human recovery action and gives the new profile a fresh budget."""
conn = kb.connect()
try:
tid = kb.create_task(conn, title="x", assignee="worker")
with kb.write_txn(conn):
conn.execute(
"UPDATE tasks SET consecutive_failures = 1, "
"last_failure_error = 'timed out' WHERE id = ?",
(tid,),
)
assert kb.assign_task(conn, tid, "reviewer") is True
task = kb.get_task(conn, tid)
assert task.assignee == "reviewer"
assert task.consecutive_failures == 0
assert task.last_failure_error is None
finally:
conn.close()
def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable): def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable):
"""`dir:` workspace with no path should fail workspace resolution AND """`dir:` workspace with no path should fail workspace resolution AND
count against the failure budget not just crash the tick.""" count against the failure budget not just crash the tick."""
@ -719,6 +738,48 @@ def test_max_runtime_terminates_overrun_worker(kanban_home):
_kb._pid_alive = original_alive _kb._pid_alive = original_alive
def test_repeated_timeouts_auto_block_at_default_limit(kanban_home):
"""Two timed_out outcomes on the same task/profile trip the retry guard."""
import hermes_cli.kanban_db as _kb
original_alive = _kb._pid_alive
_kb._pid_alive = lambda pid: False
def _age_active_run(conn, tid):
old_started = int(time.time()) - 30
with kb.write_txn(conn):
conn.execute(
"UPDATE task_runs SET started_at = ? "
"WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)",
(old_started, tid),
)
try:
conn = kb.connect()
try:
tid = kb.create_task(
conn, title="long job", assignee="worker",
max_runtime_seconds=1,
)
for expected_failures in (1, 2):
kb.claim_task(conn, tid)
kb._set_worker_pid(conn, tid, os.getpid())
_age_active_run(conn, tid)
timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda pid, sig: None)
assert tid in timed_out
task = kb.get_task(conn, tid)
assert task.consecutive_failures == expected_failures
task = kb.get_task(conn, tid)
assert task.status == "blocked"
events = kb.list_events(conn, tid)
assert [e.kind for e in events].count("timed_out") == 2
gave_up = [e for e in events if e.kind == "gave_up"]
assert gave_up and gave_up[-1].payload["trigger_outcome"] == "timed_out"
finally:
conn.close()
finally:
_kb._pid_alive = original_alive
def test_max_runtime_none_means_no_cap(kanban_home): def test_max_runtime_none_means_no_cap(kanban_home):
"""A task with max_runtime_seconds=None is never timed out regardless """A task with max_runtime_seconds=None is never timed out regardless
of how long it runs.""" of how long it runs."""