diff --git a/gateway/run.py b/gateway/run.py index 219b564eb8..31fbd0a40a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3645,6 +3645,24 @@ class GatewayRunner: if max_spawn is not None: logger.info(f"kanban dispatcher: max_spawn={max_spawn}") + raw_failure_limit = kanban_cfg.get("failure_limit", _kb.DEFAULT_FAILURE_LIMIT) + try: + failure_limit = int(raw_failure_limit) + except (TypeError, ValueError): + logger.warning( + "kanban dispatcher: invalid kanban.failure_limit=%r; using default %d", + raw_failure_limit, + _kb.DEFAULT_FAILURE_LIMIT, + ) + failure_limit = _kb.DEFAULT_FAILURE_LIMIT + if failure_limit < 1: + logger.warning( + "kanban dispatcher: kanban.failure_limit=%r is below 1; using default %d", + raw_failure_limit, + _kb.DEFAULT_FAILURE_LIMIT, + ) + failure_limit = _kb.DEFAULT_FAILURE_LIMIT + # Initial delay so the gateway finishes wiring adapters before the # dispatcher spawns workers (those workers may hit gateway notify # subscriptions etc.). Matches the notifier watcher's delay. @@ -3673,7 +3691,12 @@ class GatewayRunner: _kb.init_db(board=slug) # idempotent, handles first-run except Exception: pass - return _kb.dispatch_once(conn, board=slug, max_spawn=max_spawn) + return _kb.dispatch_once( + conn, + board=slug, + max_spawn=max_spawn, + failure_limit=failure_limit, + ) except Exception: logger.exception("kanban dispatcher: tick failed on board %s", slug) return None diff --git a/hermes_cli/config.py b/hermes_cli/config.py index cf2b0b528a..baf73c2ea5 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1230,6 +1230,10 @@ DEFAULT_CONFIG = { # Seconds between dispatcher ticks (idle or not). Lower = snappier # pickup of newly-ready tasks; higher = less SQL pressure. "dispatch_interval_seconds": 60, + # Auto-block after this many consecutive non-success attempts for the + # same task/profile (spawn_failed, timed_out, or crashed). Reassignment + # resets the streak for the new profile. + "failure_limit": 2, }, # execute_code settings — controls the tool used for programmatic tool calls. diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index d8bc47a7d7..7301e58b66 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -443,8 +443,8 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu help="Cap number of spawns this pass") p_disp.add_argument("--failure-limit", type=int, default=kb.DEFAULT_SPAWN_FAILURE_LIMIT, - help=f"Auto-block a task after this many consecutive spawn failures " - f"(default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})") + help=f"Auto-block a task after this many consecutive non-success attempts " + f"(spawn_failed, timed_out, or crashed; default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})") p_disp.add_argument("--json", action="store_true") # --- daemon (deprecated) --- @@ -1657,6 +1657,7 @@ def _cmd_daemon(args: argparse.Namespace) -> int: " kanban:\n" " dispatch_in_gateway: true # default\n" " dispatch_interval_seconds: 60\n" + " failure_limit: 2 # consecutive non-success attempts before auto-block\n" "\n" "Running both the gateway AND this standalone daemon will\n" "race for claims. If you truly need the old standalone\n" diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index d61eba1916..94968dd87c 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1380,7 +1380,7 @@ def assign_task(conn: sqlite3.Connection, task_id: str, profile: Optional[str]) profile = _canonical_assignee(profile) with write_txn(conn): row = conn.execute( - "SELECT status, claim_lock FROM tasks WHERE id = ?", (task_id,) + "SELECT status, claim_lock, assignee FROM tasks WHERE id = ?", (task_id,) ).fetchone() if not row: return False @@ -1389,7 +1389,17 @@ def assign_task(conn: sqlite3.Connection, task_id: str, profile: Optional[str]) f"cannot reassign {task_id}: currently running (claimed). " "Wait for completion or reclaim the stale lock first." ) - conn.execute("UPDATE tasks SET assignee = ? WHERE id = ?", (profile, task_id)) + if row["assignee"] != profile: + # The retry guard is scoped to the task/profile combination. A + # human reassigning the task is an explicit recovery action, so the + # new profile should not inherit the previous profile's streak. + conn.execute( + "UPDATE tasks SET assignee = ?, consecutive_failures = 0, " + "last_failure_error = NULL WHERE id = ?", + (profile, task_id), + ) + else: + conn.execute("UPDATE tasks SET assignee = ? WHERE id = ?", (profile, task_id)) _append_event(conn, task_id, "assigned", {"assignee": profile}) return True @@ -2569,11 +2579,11 @@ def set_workspace_path( # Dispatcher (one-shot pass) # --------------------------------------------------------------------------- -# After this many consecutive `spawn_failed` events on a task, the dispatcher -# stops retrying and parks the task in ``blocked`` with a reason so a human -# can investigate. Prevents the dispatcher from thrashing forever on a task -# whose profile doesn't exist, whose workspace is unmountable, etc. -DEFAULT_FAILURE_LIMIT = 5 +# After this many consecutive non-success attempts on a task/profile, the +# dispatcher stops retrying and parks the task in ``blocked`` with a reason so +# a human can investigate. Prevents retry storms when a worker repeatedly times +# out, crashes, or cannot spawn. +DEFAULT_FAILURE_LIMIT = 2 # Legacy alias — callers / tests still reference the old name. DEFAULT_SPAWN_FAILURE_LIMIT = DEFAULT_FAILURE_LIMIT diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 613c230847..a6d65f6072 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -90,22 +90,20 @@ def test_spawn_failure_auto_blocks_after_limit(kanban_home, all_assignees_spawna conn = kb.connect() try: tid = kb.create_task(conn, title="x", assignee="worker") - # Three ticks below the default limit (5) → still ready, counter grows. - for i in range(3): - res = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) - assert tid not in res.auto_blocked + assert kb.DEFAULT_FAILURE_LIMIT == 2 + # One default-limit failure → still ready, counter grows. + res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn) + assert tid not in res1.auto_blocked task = kb.get_task(conn, tid) assert task.status == "ready" - assert task.consecutive_failures == 3 + assert task.consecutive_failures == 1 - # Two more ticks → fifth failure exceeds the limit. - res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) - assert tid not in res1.auto_blocked - res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) + # Second default-limit failure trips the guard. + res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn) assert tid in res2.auto_blocked task = kb.get_task(conn, tid) assert task.status == "blocked" - assert task.consecutive_failures >= 5 + assert task.consecutive_failures >= 2 assert task.last_failure_error and "no PATH" in task.last_failure_error finally: conn.close() @@ -170,6 +168,27 @@ def test_successful_completion_resets_failure_counter(kanban_home, all_assignees conn.close() +def test_reassign_resets_failure_counter_for_new_profile(kanban_home, all_assignees_spawnable): + """Retry streaks are scoped to a task/profile pair; reassigning is a + human recovery action and gives the new profile a fresh budget.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET consecutive_failures = 1, " + "last_failure_error = 'timed out' WHERE id = ?", + (tid,), + ) + assert kb.assign_task(conn, tid, "reviewer") is True + task = kb.get_task(conn, tid) + assert task.assignee == "reviewer" + assert task.consecutive_failures == 0 + assert task.last_failure_error is None + finally: + conn.close() + + def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable): """`dir:` workspace with no path should fail workspace resolution AND count against the failure budget — not just crash the tick.""" @@ -719,6 +738,48 @@ def test_max_runtime_terminates_overrun_worker(kanban_home): _kb._pid_alive = original_alive +def test_repeated_timeouts_auto_block_at_default_limit(kanban_home): + """Two timed_out outcomes on the same task/profile trip the retry guard.""" + import hermes_cli.kanban_db as _kb + original_alive = _kb._pid_alive + _kb._pid_alive = lambda pid: False + + def _age_active_run(conn, tid): + old_started = int(time.time()) - 30 + with kb.write_txn(conn): + conn.execute( + "UPDATE task_runs SET started_at = ? " + "WHERE id = (SELECT current_run_id FROM tasks WHERE id = ?)", + (old_started, tid), + ) + + try: + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="long job", assignee="worker", + max_runtime_seconds=1, + ) + for expected_failures in (1, 2): + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, os.getpid()) + _age_active_run(conn, tid) + timed_out = kb.enforce_max_runtime(conn, signal_fn=lambda pid, sig: None) + assert tid in timed_out + task = kb.get_task(conn, tid) + assert task.consecutive_failures == expected_failures + task = kb.get_task(conn, tid) + assert task.status == "blocked" + events = kb.list_events(conn, tid) + assert [e.kind for e in events].count("timed_out") == 2 + gave_up = [e for e in events if e.kind == "gave_up"] + assert gave_up and gave_up[-1].payload["trigger_outcome"] == "timed_out" + finally: + conn.close() + finally: + _kb._pid_alive = original_alive + + def test_max_runtime_none_means_no_cap(kanban_home): """A task with max_runtime_seconds=None is never timed out regardless of how long it runs."""