diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 7301e58b66..59e44795f3 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -70,6 +70,7 @@ def _task_to_dict(t: kb.Task) -> dict[str, Any]: "completed_at": t.completed_at, "result": t.result, "skills": list(t.skills) if t.skills else [], + "max_retries": t.max_retries, } @@ -284,6 +285,15 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu "(repeatable). Appended to the built-in " "kanban-worker skill. Example: " "--skill translation --skill github-code-review") + p_create.add_argument("--max-retries", type=int, default=None, + metavar="N", + help="Per-task override for the consecutive-failure " + "circuit breaker. Trip on the Nth failure — " + "e.g. --max-retries 1 blocks on the first " + "failure (no retries), --max-retries 3 allows " + "two retries. Omit to use the dispatcher's " + "kanban.failure_limit config " + f"(default {kb.DEFAULT_FAILURE_LIMIT}).") p_create.add_argument("--json", action="store_true", help="Emit JSON output") # --- list --- @@ -982,6 +992,14 @@ def _cmd_create(args: argparse.Namespace) -> int: except ValueError as exc: print(f"kanban: --max-runtime: {exc}", file=sys.stderr) return 2 + max_retries = getattr(args, "max_retries", None) + if max_retries is not None and max_retries < 1: + print( + f"kanban: --max-retries must be >= 1 (got {max_retries}); " + "use 1 to trip on the first failure.", + file=sys.stderr, + ) + return 2 with kb.connect() as conn: task_id = kb.create_task( conn, @@ -998,6 +1016,7 @@ def _cmd_create(args: argparse.Namespace) -> int: idempotency_key=getattr(args, "idempotency_key", None), max_runtime_seconds=max_runtime, skills=getattr(args, "skills", None) or None, + max_retries=max_retries, ) task = kb.get_task(conn, task_id) if getattr(args, "json", False): @@ -1125,6 +1144,23 @@ def _cmd_show(args: argparse.Namespace) -> int: (f" @ {task.workspace_path}" if task.workspace_path else "")) if task.skills: print(f" skills: {', '.join(task.skills)}") + # Effective retry threshold. Show the per-task override if set, + # otherwise the dispatcher's resolved value from config (or the + # default if config doesn't set it either). Helps operators see + # why a task auto-blocked earlier/later than they expected. + if task.max_retries is not None: + print(f" max-retries: {task.max_retries} (task)") + else: + try: + from hermes_cli.config import load_config + cfg = load_config() + cfg_val = (cfg.get("kanban", {}) or {}).get("failure_limit") + except Exception: + cfg_val = None + if cfg_val is not None and int(cfg_val) != kb.DEFAULT_FAILURE_LIMIT: + print(f" max-retries: {int(cfg_val)} (config kanban.failure_limit)") + else: + print(f" max-retries: {kb.DEFAULT_FAILURE_LIMIT} (default)") print(f" created: {_fmt_ts(task.created_at)} by {task.created_by or '-'}") # Diagnostics section — surface active distress signals at the top diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 1c97d6beec..920e23e403 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -595,6 +595,14 @@ class Task: # JSON array of skill names. None = use only the defaults; empty # list = explicitly no extra skills. skills: Optional[list] = None + # Per-task override for the consecutive-failure circuit breaker. + # The value is the failure count at which the breaker trips — e.g. + # ``max_retries=1`` blocks on the first failure (zero retries), + # ``max_retries=3`` blocks on the third (two retries allowed). + # ``None`` (the common case) falls through to the dispatcher-level + # ``kanban.failure_limit`` config, and then to ``DEFAULT_FAILURE_LIMIT``. + # Name matches the ``--max-retries`` CLI flag on ``kanban create``. + max_retries: Optional[int] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "Task": @@ -656,6 +664,9 @@ class Task: row["current_step_key"] if "current_step_key" in keys else None ), skills=skills_value, + max_retries=( + row["max_retries"] if "max_retries" in keys else None + ), ) @@ -776,7 +787,13 @@ CREATE TABLE IF NOT EXISTS tasks ( -- Force-loaded skills for the worker on this task, stored as JSON. -- Appended to the dispatcher's built-in `--skills kanban-worker`. -- NULL or empty array = no extras. - skills TEXT + skills TEXT, + -- Per-task override for the consecutive-failure circuit breaker. + -- The value is the failure count at which the breaker trips — e.g. + -- ``max_retries=1`` blocks on the first failure. NULL (the common + -- case) falls through to the dispatcher-level ``kanban.failure_limit`` + -- config and then ``DEFAULT_FAILURE_LIMIT``. + max_retries INTEGER ); CREATE TABLE IF NOT EXISTS task_links ( @@ -1008,6 +1025,14 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: # for existing rows. conn.execute("ALTER TABLE tasks ADD COLUMN skills TEXT") + if "max_retries" not in cols: + # Per-task override for the consecutive-failure circuit breaker. + # NULL = fall through to the dispatcher-level ``kanban.failure_limit`` + # config, then ``DEFAULT_FAILURE_LIMIT``. Existing rows get NULL, + # which is the correct default (they keep the global behaviour + # they were getting before the column existed). + conn.execute("ALTER TABLE tasks ADD COLUMN max_retries INTEGER") + # task_events gained a run_id column; back-fill it as NULL for # historical events (they predate runs and can't be attributed). ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")} @@ -1163,6 +1188,7 @@ def create_task( idempotency_key: Optional[str] = None, max_runtime_seconds: Optional[int] = None, skills: Optional[Iterable[str]] = None, + max_retries: Optional[int] = None, ) -> str: """Create a new task and optionally link it under parent tasks. @@ -1276,8 +1302,9 @@ def create_task( INSERT INTO tasks ( id, title, body, assignee, status, priority, created_by, created_at, workspace_kind, workspace_path, - tenant, idempotency_key, max_runtime_seconds, skills - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + tenant, idempotency_key, max_runtime_seconds, skills, + max_retries + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, @@ -1294,6 +1321,7 @@ def create_task( idempotency_key, int(max_runtime_seconds) if max_runtime_seconds else None, json.dumps(skills_list) if skills_list is not None else None, + int(max_retries) if max_retries is not None else None, ), ) for pid in parents: @@ -3149,20 +3177,39 @@ def _record_task_failure( ``event_payload_extra`` merges into the ``gave_up`` event payload when the breaker trips, so callers can include outcome-specific context (e.g. pid on crash, elapsed on timeout). + + Resolution order for the effective threshold: + 1. per-task ``max_retries`` if set (nothing else overrides) + 2. caller-supplied ``failure_limit`` (gateway passes the config + value from ``kanban.failure_limit``; tests pass fixed values) + 3. ``DEFAULT_FAILURE_LIMIT`` """ if failure_limit is None: failure_limit = DEFAULT_FAILURE_LIMIT blocked = False with write_txn(conn): row = conn.execute( - "SELECT consecutive_failures, status FROM tasks WHERE id = ?", (task_id,), + "SELECT consecutive_failures, status, max_retries " + "FROM tasks WHERE id = ?", (task_id,), ).fetchone() if row is None: return False failures = int(row["consecutive_failures"]) + 1 cur_status = row["status"] - if failures >= failure_limit: + # Per-task override wins over both caller-supplied and default + # thresholds. None (the common case) falls through. + task_override = ( + row["max_retries"] if "max_retries" in row.keys() else None + ) + if task_override is not None: + effective_limit = int(task_override) + limit_source = "task" + else: + effective_limit = int(failure_limit) + limit_source = "dispatcher" + + if failures >= effective_limit: # Trip the breaker. if release_claim: # Spawn path: still running, also clear claim state. @@ -3190,10 +3237,17 @@ def _record_task_failure( conn, task_id, outcome="gave_up", status="gave_up", error=error[:500], - metadata={"failures": failures, "trigger_outcome": outcome}, + metadata={ + "failures": failures, + "trigger_outcome": outcome, + "effective_limit": effective_limit, + "limit_source": limit_source, + }, ) payload = { "failures": failures, + "effective_limit": effective_limit, + "limit_source": limit_source, "error": error[:500], "trigger_outcome": outcome, } diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 306112c64a..45d457630e 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -189,6 +189,137 @@ def test_reassign_resets_failure_counter_for_new_profile(kanban_home, all_assign conn.close() +def test_per_task_max_retries_overrides_dispatcher_limit(kanban_home, all_assignees_spawnable): + """Per-task ``max_retries`` overrides both the caller-supplied + ``failure_limit`` (gateway config) and the hardcoded default. + + Three-tier resolution order: + 1. ``task.max_retries`` (set via ``create_task(max_retries=N)`` / + ``hermes kanban create --max-retries N``) + 2. ``failure_limit`` kwarg passed by the caller (gateway threads + this from ``kanban.failure_limit`` config) + 3. ``DEFAULT_FAILURE_LIMIT`` + """ + conn = kb.connect() + try: + # max_retries=1 should trip on the FIRST failure, even though the + # caller is asking for failure_limit=10. + tid = kb.create_task( + conn, title="one-shot", assignee="worker", max_retries=1, + ) + task = kb.get_task(conn, tid) + assert task.max_retries == 1, "per-task override must persist" + + kb.claim_task(conn, tid) + tripped = kb._record_task_failure( + conn, tid, + error="first fail", + outcome="spawn_failed", + failure_limit=10, # far higher than per-task override + release_claim=True, + end_run=False, + ) + assert tripped is True, "should auto-block on first failure" + task = kb.get_task(conn, tid) + assert task.status == "blocked" + assert task.consecutive_failures == 1 + + # gave_up event should record where the threshold came from + events = kb.list_events(conn, tid) + gave_up = [e for e in events if e.kind == "gave_up"] + assert gave_up, f"expected gave_up event, got {[e.kind for e in events]}" + assert gave_up[-1].payload.get("limit_source") == "task" + assert gave_up[-1].payload.get("effective_limit") == 1 + finally: + conn.close() + + +def test_per_task_max_retries_allows_more_than_default(kanban_home, all_assignees_spawnable): + """A task with ``max_retries=5`` does NOT auto-block at the default + limit of 2 — it must reach the per-task override first.""" + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="flaky-retry", assignee="worker", max_retries=5, + ) + # Four failures — still below the per-task threshold, should stay ready. + for i in range(1, 5): + kb.claim_task(conn, tid) + tripped = kb._record_task_failure( + conn, tid, + error=f"fail {i}", + outcome="spawn_failed", + # Caller passes the default so the dispatcher tier matches + # ``DEFAULT_FAILURE_LIMIT``; without the per-task override + # the breaker would have tripped at failure 2. + release_claim=True, + end_run=False, + ) + assert tripped is False, f"shouldn't trip at failure {i} with max_retries=5" + task = kb.get_task(conn, tid) + assert task.status == "ready", f"at failure {i} status was {task.status}" + + # Fifth failure trips the per-task limit. + kb.claim_task(conn, tid) + tripped = kb._record_task_failure( + conn, tid, + error="fail 5", + outcome="spawn_failed", + release_claim=True, + end_run=False, + ) + assert tripped is True + task = kb.get_task(conn, tid) + assert task.status == "blocked" + assert task.consecutive_failures == 5 + finally: + conn.close() + + +def test_max_retries_none_falls_through_to_dispatcher_limit(kanban_home, all_assignees_spawnable): + """``max_retries=None`` (the default) falls through to the caller- + supplied ``failure_limit`` — the gateway config tier.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="standard", assignee="worker") + task = kb.get_task(conn, tid) + assert task.max_retries is None + + # Caller passes failure_limit=4 (simulates kanban.failure_limit=4). + # Should trip at 4, not at the DEFAULT_FAILURE_LIMIT of 2. + for i in range(1, 4): + kb.claim_task(conn, tid) + tripped = kb._record_task_failure( + conn, tid, + error=f"fail {i}", + outcome="spawn_failed", + failure_limit=4, + release_claim=True, + end_run=False, + ) + assert tripped is False, f"premature trip at failure {i}" + + kb.claim_task(conn, tid) + tripped = kb._record_task_failure( + conn, tid, + error="fail 4", + outcome="spawn_failed", + failure_limit=4, + release_claim=True, + end_run=False, + ) + assert tripped is True + task = kb.get_task(conn, tid) + assert task.status == "blocked" + + events = kb.list_events(conn, tid) + gave_up = [e for e in events if e.kind == "gave_up"] + assert gave_up[-1].payload.get("limit_source") == "dispatcher" + assert gave_up[-1].payload.get("effective_limit") == 4 + finally: + conn.close() + + def test_workspace_resolution_failure_also_counts(kanban_home, all_assignees_spawnable): """`dir:` workspace with no path should fail workspace resolution AND count against the failure budget — not just crash the tick."""