From 3b6347af158e125b118068ac4af55b8d4ceb6247 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 28 May 2026 19:02:55 -0700 Subject: [PATCH] feat(kanban): default_assignee fallback + per-profile concurrency cap (#27145, #21582) (#34244) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related dispatcher behaviors that have been missing for a while. ## kanban.default_assignee (#27145) Reporter (@agarzon): dashboard creates a task without an assignee, task parks in 'ready' forever even though the operator's intent ('default') is perfectly clear. The dispatcher already had a 'skipped_unassigned' bucket but no fallback routing — users had to manually type 'default' in the assignee field every time. Behavior: when 'kanban.default_assignee' is set in config.yaml, the dispatcher applies that assignee to any unassigned ready task before deciding whether to spawn. The row is mutated (assignee column + an 'assigned' event with source='kanban.default_assignee' for the audit trail). Empty/whitespace config value = no fallback, preserving the existing skipped_unassigned behavior. Dry-run mode reports what WOULD happen via the new 'auto_assigned_default' bucket on DispatchResult, but does NOT mutate the DB — operators using 'hermes kanban dispatch --dry-run' see the routing decision before committing. ## kanban.max_in_progress_per_profile (#21582) Reporter (@edwardchenchen, @simlu, 4 reactions): fan-out workloads saturate one profile's local model / API quota / browser pool while other profiles sit idle. The existing global 'max_in_progress' caps total workers but doesn't balance across profiles. Behavior: when 'kanban.max_in_progress_per_profile' is set to a positive int, the dispatcher tracks per-assignee running counts (one query at tick start) and refuses to spawn for any assignee already at the cap. Tasks blocked this way go to a new 'skipped_per_profile_capped' bucket on DispatchResult as (task_id, assignee, current_running_count) tuples — NOT an operator-actionable failure, just 'try again next tick when the profile has capacity'. Pre-existing 'running' tasks count against the cap (verified via regression test). The cap respects dry_run mode by incrementing its in-memory counter on each would-be spawn so dry_run reports the same balanced subset that a real tick would. Invalid cap values (0, negative, non-int, None) are treated as 'no cap', preserving the existing behavior. Backward-compatible for installs that don't set the config. ## Surfaces - 'hermes kanban dispatch' CLI now prints 'Auto-assigned to kanban.default_assignee=X: ...' and 'Deferred (X at per-profile cap, N running): ...' lines, plus matching JSON keys in --json output. - Gateway dispatcher logs the configured values at startup ('default_assignee=X', 'max_in_progress_per_profile=N'). - 'kanban.max_in_progress_per_profile' added to DEFAULT_CONFIG with inline docs. ## Validation - tests/hermes_cli/test_kanban_default_assignee.py (6 cases): no-cap baseline, auto-assign + DB mutation, dry-run reports without mutating, whitespace treated as None, explicit assignees untouched, DispatchResult field schema. - tests/hermes_cli/test_kanban_per_profile_cap.py (9 cases including 4 parametrized): no-cap baseline, balanced 2-profile fan-out, pre-existing running counts against cap, invalid cap values (0/-1/'abc'/None), capped tasks dispatched on next tick after running task completes, DispatchResult field schema. - Broader kanban suite: 464/464 pass (was 449 baseline; +15 new regression tests across both features). ## Credit #27145 — Jimmy Johansson reported the dispatcher skipped-unassigned gap; @agarzon scoped the simpler 'honor kanban.default_assignee' fix that matches the existing config knob. #21582 — @edwardchenchen filed the per-profile cap ask after hitting model 429s on fan-out research projects; @simlu confirmed the same pain on local-model setups. --- gateway/run.py | 45 +++++ hermes_cli/config.py | 9 + hermes_cli/kanban.py | 38 ++++ hermes_cli/kanban_db.py | 131 +++++++++++++- .../test_kanban_default_assignee.py | 154 ++++++++++++++++ .../hermes_cli/test_kanban_per_profile_cap.py | 167 ++++++++++++++++++ 6 files changed, 539 insertions(+), 5 deletions(-) create mode 100644 tests/hermes_cli/test_kanban_default_assignee.py create mode 100644 tests/hermes_cli/test_kanban_per_profile_cap.py diff --git a/gateway/run.py b/gateway/run.py index 876a23879e1..f59aa4109b4 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5420,6 +5420,49 @@ class GatewayRunner: ) stale_timeout_seconds = 0 + # Read kanban.default_assignee — fallback profile for tasks + # created without an explicit assignee (e.g. via the dashboard). + # When set, the dispatcher applies it to unassigned ready tasks + # instead of skipping them indefinitely (#27145). Empty string + # (the schema default) means "no fallback, keep skipping" — + # backward-compatible with existing installs. + default_assignee = (kanban_cfg.get("default_assignee") or "").strip() or None + if default_assignee: + logger.info( + "kanban dispatcher: default_assignee=%r (unassigned ready tasks " + "will route to this profile)", + default_assignee, + ) + + # Read kanban.max_in_progress_per_profile — per-profile concurrency + # cap (#21582). When set, no single profile gets more than N + # workers running at once, even if the global max_in_progress + # would allow it. Prevents one profile's local model / API quota + # / browser pool from being overwhelmed by a fan-out. + raw_per_profile = kanban_cfg.get("max_in_progress_per_profile", None) + max_in_progress_per_profile = None + if raw_per_profile is not None: + try: + max_in_progress_per_profile = int(raw_per_profile) + except (TypeError, ValueError): + logger.warning( + "kanban dispatcher: invalid kanban.max_in_progress_per_profile=%r; ignoring", + raw_per_profile, + ) + max_in_progress_per_profile = None + else: + if max_in_progress_per_profile < 1: + logger.warning( + "kanban dispatcher: kanban.max_in_progress_per_profile=%r is below 1; ignoring", + raw_per_profile, + ) + max_in_progress_per_profile = None + else: + logger.info( + "kanban dispatcher: max_in_progress_per_profile=%d", + max_in_progress_per_profile, + ) + # Initial delay so the gateway finishes wiring adapters before the # dispatcher spawns workers (those workers may hit gateway notify # subscriptions etc.). Matches the notifier watcher's delay. @@ -5511,6 +5554,8 @@ class GatewayRunner: max_in_progress=max_in_progress, failure_limit=failure_limit, stale_timeout_seconds=stale_timeout_seconds, + default_assignee=default_assignee, + max_in_progress_per_profile=max_in_progress_per_profile, ) except sqlite3.DatabaseError as exc: if _is_corrupt_board_db_error(exc): diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 74c04635712..c3c3cf61154 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1726,6 +1726,15 @@ DEFAULT_CONFIG = { # assignee to any installed profile. When unset, falls back to the # default profile. A task never ends up with assignee=None. "default_assignee": "", + # Per-profile concurrency cap (#21582). When set to a positive int, + # no single profile can have more than N workers running at once, + # even if the global max_in_progress / max_spawn caps would allow + # it. Tasks blocked this way defer to the next dispatcher tick. + # Unset (None) means "no per-profile cap" — backward-compatible + # with existing installs. Useful for fan-out workflows that would + # otherwise saturate one profile's local model / API quota / + # browser pool while leaving other profiles idle. + "max_in_progress_per_profile": None, # When true, the kanban dispatcher auto-runs the decomposer on # tasks that land in Triage (every dispatcher tick). When false, # decomposition is manual via `hermes kanban decompose ` or diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index f683f69edee..3f5bfab68cf 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -2087,12 +2087,35 @@ def _cmd_tail(args: argparse.Namespace) -> int: def _cmd_dispatch(args: argparse.Namespace) -> int: + # Honour kanban.default_assignee as the fallback for unassigned ready + # tasks (#27145) and kanban.max_in_progress_per_profile as the + # per-profile concurrency cap (#21582). Same semantics as the + # gateway dispatch path. + try: + from hermes_cli.config import load_config + _cfg = load_config() + _kanban_cfg = _cfg.get("kanban", {}) if isinstance(_cfg, dict) else {} + default_assignee = (_kanban_cfg.get("default_assignee") or "").strip() or None + _raw_per_profile = _kanban_cfg.get("max_in_progress_per_profile", None) + try: + max_in_progress_per_profile = ( + int(_raw_per_profile) if _raw_per_profile is not None else None + ) + if max_in_progress_per_profile is not None and max_in_progress_per_profile < 1: + max_in_progress_per_profile = None + except (TypeError, ValueError): + max_in_progress_per_profile = None + except Exception: + default_assignee = None + max_in_progress_per_profile = None with kb.connect_closing() as conn: res = kb.dispatch_once( conn, dry_run=args.dry_run, max_spawn=args.max, failure_limit=getattr(args, "failure_limit", kb.DEFAULT_SPAWN_FAILURE_LIMIT), + default_assignee=default_assignee, + max_in_progress_per_profile=max_in_progress_per_profile, ) if getattr(args, "json", False): print(json.dumps({ @@ -2108,6 +2131,11 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: ], "skipped_unassigned": res.skipped_unassigned, "skipped_nonspawnable": res.skipped_nonspawnable, + "skipped_per_profile_capped": [ + {"task_id": tid, "assignee": who, "current": current} + for (tid, who, current) in res.skipped_per_profile_capped + ], + "auto_assigned_default": res.auto_assigned_default, }, indent=2)) return 0 print(f"Reclaimed: {res.reclaimed}") @@ -2128,8 +2156,18 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: for tid, who, ws in res.spawned: tag = " (dry)" if args.dry_run else "" print(f" - {tid} -> {who} @ {ws or '-'}{tag}") + if res.auto_assigned_default: + print( + f"Auto-assigned to kanban.default_assignee={default_assignee!r}: " + f"{', '.join(res.auto_assigned_default)}" + ) if res.skipped_unassigned: print(f"Skipped (unassigned): {', '.join(res.skipped_unassigned)}") + if res.skipped_per_profile_capped: + for tid, who, current in res.skipped_per_profile_capped: + print( + f"Deferred ({who} at per-profile cap, {current} running): {tid}" + ) if res.skipped_nonspawnable: print( f"Skipped (non-spawnable assignee — terminal lane, OK): " diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index cbe7f03a59e..26783b4f860 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -4289,6 +4289,12 @@ class DispatchResult: skipped_unassigned: list[str] = field(default_factory=list) """Ready task ids skipped because they have no assignee at all. Operator-actionable — usually a misfiled task waiting for routing.""" + auto_assigned_default: list[str] = field(default_factory=list) + """Task ids that were unassigned in the DB and had + ``kanban.default_assignee`` applied this tick before spawning (#27145). + Surfaces the auto-assignment to telemetry / CLI / dashboard so the + operator can see when the dispatcher is acting on the fallback rule + rather than on explicit per-task assignments.""" skipped_nonspawnable: list[str] = field(default_factory=list) """Ready task ids skipped because their assignee names a control-plane lane (a Claude Code terminal like ``orion-cc``) rather than a Hermes @@ -4296,6 +4302,14 @@ class DispatchResult: operator-actionable failure. Tracked separately so health telemetry can distinguish "real stuck" (nothing spawned but spawnable work available) from "correctly idle" (nothing spawnable in the queue).""" + skipped_per_profile_capped: list[tuple[str, str, int]] = field(default_factory=list) + """Tasks deferred this tick because their assignee is already at + ``kanban.max_in_progress_per_profile`` (#21582). Each entry is + ``(task_id, assignee, current_running_count)``. NOT an + operator-actionable failure — the task will be picked up on a + subsequent tick when the assignee has capacity. Separate bucket so + telemetry / dashboards can show "this profile is busy" vs + "task is genuinely stuck".""" crashed: list[str] = field(default_factory=list) """Task ids reclaimed because their worker PID disappeared.""" auto_blocked: list[str] = field(default_factory=list) @@ -5342,6 +5356,8 @@ def dispatch_once( failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, stale_timeout_seconds: int = 0, board: Optional[str] = None, + default_assignee: Optional[str] = None, + max_in_progress_per_profile: Optional[int] = None, ) -> DispatchResult: """Run one dispatcher tick. @@ -5427,12 +5443,89 @@ def dispatch_once( if max_spawn is None or max_spawn > remaining: max_spawn = remaining spawned = 0 + # Per-profile concurrency cap (#21582): when set, track how many + # workers each assignee already has in flight, and refuse to spawn + # when this would push that assignee past the cap. Prevents + # fan-out workloads from melting a single profile's local model / + # API quota / browser pool while leaving other profiles idle. + # Tasks blocked this way go to skipped_per_profile_capped (not + # skipped_unassigned — the operator-actionable signal is different: + # "this profile is busy, try again later" not "this needs routing"). + _per_profile_cap = max_in_progress_per_profile if ( + isinstance(max_in_progress_per_profile, int) + and max_in_progress_per_profile > 0 + ) else None + _per_profile_running: dict[str, int] = {} + if _per_profile_cap is not None: + for prow in conn.execute( + "SELECT assignee, COUNT(*) AS n FROM tasks " + "WHERE status = 'running' AND assignee IS NOT NULL " + "GROUP BY assignee" + ): + _per_profile_running[prow["assignee"]] = int(prow["n"]) + # Normalize default_assignee once: empty/whitespace string → None so the + # rest of the loop can use ``if default_assignee:`` as a single check. + # We also resolve profile_exists once here for the same reason. + _default_assignee = (default_assignee or "").strip() or None + _default_assignee_resolved = False + if _default_assignee: + try: + from hermes_cli.profiles import profile_exists as _pe + _default_assignee_resolved = bool(_pe(_default_assignee)) + except Exception: + # Profiles module not importable (test stubs, exotic envs). + # Trust the operator's config and try the assignment; the + # downstream profile_exists check on the assigned row will + # bucket it as nonspawnable if the profile genuinely isn't + # there, with the existing diagnostic. + _default_assignee_resolved = True for row in ready_rows: if max_spawn is not None and running_count + spawned >= max_spawn: break - if not row["assignee"]: - result.skipped_unassigned.append(row["id"]) - continue + row_assignee = row["assignee"] + if not row_assignee: + # Honour kanban.default_assignee: when the dispatcher hits an + # unassigned ready task and an operator-configured fallback + # exists, persist the assignment and proceed. This removes the + # dashboard footgun where a task created without an assignee + # parks in 'ready' forever even though the operator's intent + # ("default") was perfectly clear (#27145). Mutating the row + # (not just the in-memory view) keeps diagnostics and the + # board state consistent: the task is now legitimately owned + # by ``kanban.default_assignee``, not "unassigned but secretly + # routed". + if _default_assignee and _default_assignee_resolved: + # Dry-run: show what WOULD happen (auto-assign + spawn) without + # mutating the DB. Real run: mutate the row + emit the + # 'assigned' event so the board state matches what just happened. + if not dry_run: + try: + with write_txn(conn): + conn.execute( + "UPDATE tasks SET assignee = ? WHERE id = ? " + "AND (assignee IS NULL OR assignee = '')", + (_default_assignee, row["id"]), + ) + _append_event( + conn, row["id"], "assigned", + { + "assignee": _default_assignee, + "source": "kanban.default_assignee", + }, + ) + except Exception: + _log.debug( + "kanban dispatch: failed to apply default_assignee=%r " + "to task %s", + _default_assignee, row["id"], exc_info=True, + ) + result.skipped_unassigned.append(row["id"]) + continue + row_assignee = _default_assignee + result.auto_assigned_default.append(row["id"]) + else: + result.skipped_unassigned.append(row["id"]) + continue # Skip ready tasks whose assignee is not a real Hermes profile. # `_default_spawn` invokes ``hermes -p `` which fails # with "Profile 'X' does not exist" when the assignee names a @@ -5447,7 +5540,7 @@ def dispatch_once( from hermes_cli.profiles import profile_exists # local import: avoids cycle except Exception: profile_exists = None # type: ignore[assignment] - if profile_exists is not None and not profile_exists(row["assignee"]): + if profile_exists is not None and not profile_exists(row_assignee): # Bucket separately from skipped_unassigned: the operator # cannot fix this by assigning a profile (the assignee IS the # intended owner — a terminal lane). Health telemetry uses @@ -5456,6 +5549,19 @@ def dispatch_once( # of human-pulled work. result.skipped_nonspawnable.append(row["id"]) continue + # Per-profile concurrency cap (#21582): even if there's global + # headroom, refuse to spawn for an assignee that's already at + # its in-flight cap. Prevents one profile's local model / API + # quota / browser pool from being overwhelmed by a fan-out + # while the global max_in_progress / max_spawn caps still allow + # work on OTHER profiles. + if _per_profile_cap is not None: + current = _per_profile_running.get(row_assignee, 0) + if current >= _per_profile_cap: + result.skipped_per_profile_capped.append( + (row["id"], row_assignee, current) + ) + continue # Respawn guard: refuse to re-spawn when useful work is already # in-flight/recent, or when the last failure is a deterministic # blocker (quota / auth). The guard defers the spawn this tick so @@ -5478,7 +5584,15 @@ def dispatch_once( ) continue if dry_run: - result.spawned.append((row["id"], row["assignee"], "")) + result.spawned.append((row["id"], row_assignee, "")) + # Increment per-profile counter even in dry_run so the cap + # check sees the would-be spawn on subsequent iterations. + # Without this, dry_run reports every task as spawnable and + # under-reports the capped subset (#21582). + if _per_profile_cap is not None and row_assignee: + _per_profile_running[row_assignee] = ( + _per_profile_running.get(row_assignee, 0) + 1 + ) continue claimed = claim_task(conn, row["id"], ttl_seconds=ttl_seconds) if claimed is None: @@ -5521,6 +5635,13 @@ def dispatch_once( # complete_task). result.spawned.append((claimed.id, claimed.assignee or "", str(workspace))) spawned += 1 + # Track the new in-flight count for this profile so later + # iterations in this same tick respect the per-profile cap + # (#21582). Subsequent ticks re-query from the DB. + if _per_profile_cap is not None and claimed.assignee: + _per_profile_running[claimed.assignee] = ( + _per_profile_running.get(claimed.assignee, 0) + 1 + ) except Exception as exc: auto = _record_spawn_failure( conn, claimed.id, str(exc), diff --git a/tests/hermes_cli/test_kanban_default_assignee.py b/tests/hermes_cli/test_kanban_default_assignee.py new file mode 100644 index 00000000000..70b51bbdb60 --- /dev/null +++ b/tests/hermes_cli/test_kanban_default_assignee.py @@ -0,0 +1,154 @@ +"""Regression tests for #27145 — kanban.default_assignee for unassigned ready tasks. + +When the dispatcher hits an unassigned ready task and ``kanban.default_assignee`` +is set, the dispatcher applies the assignment and spawns. Without the config, +the task is skipped (existing behavior preserved). +""" +from __future__ import annotations + +import json +import os +import sys +import tempfile + +import pytest + + +@pytest.fixture() +def isolated_kanban_home(monkeypatch): + """Spin up a fresh HERMES_HOME with a clean kanban DB.""" + test_home = tempfile.mkdtemp(prefix="kanban_default_assignee_test_") + monkeypatch.setenv("HERMES_HOME", test_home) + # Force-reimport so the fresh HERMES_HOME is picked up. + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_cli") or mod.startswith("hermes_state") or mod == "hermes_constants": + del sys.modules[mod] + from hermes_cli import kanban_db + yield kanban_db, test_home + # Cleanup is best-effort; tempfile dir survives but pytest isolation + # gives each test its own monkeypatched HERMES_HOME so no cross-test + # contamination. + + +def _fake_spawn(*args, **kwargs): + """Stand-in for the real worker spawn — returns a fake PID.""" + return 12345 + + +def test_unassigned_task_skipped_without_default_assignee(isolated_kanban_home): + """Baseline: with no default_assignee, an unassigned ready task is + skipped via the existing `skipped_unassigned` bucket and the DB row + is untouched.""" + kb, _home = isolated_kanban_home + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + task_id = kb.create_task(conn, title="t1", assignee=None) + with kb.connect_closing() as conn: + res = kb.dispatch_once(conn, spawn_fn=_fake_spawn, dry_run=False) + assert res.skipped_unassigned == [task_id] + assert not res.auto_assigned_default + assert not res.spawned + with kb.connect_closing() as conn: + row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone() + assert row["assignee"] is None + + +def test_unassigned_task_auto_assigned_with_default_assignee(isolated_kanban_home): + """Core #27145 contract: with default_assignee set, an unassigned ready + task gets the assignment applied and dispatched on the same tick. The + DB row is mutated (assignee column + an 'assigned' event).""" + kb, _home = isolated_kanban_home + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + task_id = kb.create_task(conn, title="t1", assignee=None) + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=False, + default_assignee="default", + ) + assert res.auto_assigned_default == [task_id] + assert not res.skipped_unassigned + assert len(res.spawned) == 1 + assert res.spawned[0][0] == task_id + assert res.spawned[0][1] == "default" + + with kb.connect_closing() as conn: + row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone() + assert row["assignee"] == "default" + + # 'assigned' event emitted for the audit trail + with kb.connect_closing() as conn: + evs = list(conn.execute( + "SELECT kind, payload FROM task_events WHERE task_id = ? AND kind = 'assigned'", + (task_id,), + )) + assert len(evs) == 1 + payload = json.loads(evs[0][1]) + assert payload["assignee"] == "default" + assert payload["source"] == "kanban.default_assignee" + + +def test_dry_run_with_default_assignee_reports_without_mutating(isolated_kanban_home): + """Dry-run mode: reports what WOULD happen (task in auto_assigned_default, + spawn entry) but does NOT mutate the DB. Operators using + `hermes kanban dispatch --dry-run` see the routing decision before + committing.""" + kb, _home = isolated_kanban_home + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + task_id = kb.create_task(conn, title="t1", assignee=None) + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=True, + default_assignee="default", + ) + assert res.auto_assigned_default == [task_id] + assert len(res.spawned) == 1 + with kb.connect_closing() as conn: + row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone() + # DB unchanged — dry_run did not commit the assignment. + assert row["assignee"] is None + + +def test_whitespace_default_assignee_treated_as_none(isolated_kanban_home): + """Empty / whitespace-only default_assignee values must be treated as + 'no fallback set' so a misconfigured kanban.default_assignee=' ' + doesn't surprise operators by silently routing unassigned tasks.""" + kb, _home = isolated_kanban_home + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + task_id = kb.create_task(conn, title="t1", assignee=None) + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=False, + default_assignee=" ", + ) + assert task_id in res.skipped_unassigned + assert not res.auto_assigned_default + + +def test_explicitly_assigned_task_untouched_by_default_assignee(isolated_kanban_home): + """A task with an explicit assignee must NOT be touched by the + default_assignee logic — that fallback only applies to genuinely + unassigned rows.""" + kb, _home = isolated_kanban_home + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + task_id = kb.create_task(conn, title="t1", assignee="default") + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=False, + default_assignee="someother", + ) + assert task_id not in res.auto_assigned_default + assert any(s[0] == task_id and s[1] == "default" for s in res.spawned) + + +def test_dispatch_result_has_auto_assigned_default_field(): + """Schema-level invariant: DispatchResult exposes the + auto_assigned_default field so CLI / dashboard / gateway can surface + the new routing decisions.""" + from hermes_cli.kanban_db import DispatchResult + r = DispatchResult() + assert hasattr(r, "auto_assigned_default") + assert r.auto_assigned_default == [] diff --git a/tests/hermes_cli/test_kanban_per_profile_cap.py b/tests/hermes_cli/test_kanban_per_profile_cap.py new file mode 100644 index 00000000000..2cf7a3e8f21 --- /dev/null +++ b/tests/hermes_cli/test_kanban_per_profile_cap.py @@ -0,0 +1,167 @@ +"""Regression tests for #21582 — per-profile concurrency cap in dispatcher. + +When ``kanban.max_in_progress_per_profile`` is set, no single profile +gets more than N workers running at once even if the global +``max_in_progress`` cap would allow it. Prevents one profile's local +model / API quota / browser pool from being overwhelmed by a fan-out. +""" +from __future__ import annotations + +import os +import sys +import tempfile + +import pytest + + +@pytest.fixture() +def isolated_kanban_home_with_profiles(monkeypatch): + """Spin up a fresh HERMES_HOME with kanban DB + alpha/beta profiles.""" + test_home = tempfile.mkdtemp(prefix="kanban_per_profile_cap_test_") + for prof in ("alpha", "beta", "default"): + os.makedirs(os.path.join(test_home, "profiles", prof), exist_ok=True) + monkeypatch.setenv("HERMES_HOME", test_home) + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_cli") or mod.startswith("hermes_state") or mod == "hermes_constants": + del sys.modules[mod] + from hermes_cli import kanban_db + yield kanban_db + + +def _fake_spawn(*args, **kwargs): + return 12345 + + +def test_no_cap_all_tasks_dispatched(isolated_kanban_home_with_profiles): + """Baseline: with no per-profile cap, all ready tasks dispatch.""" + kb = isolated_kanban_home_with_profiles + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + for i in range(5): + kb.create_task(conn, title=f"a{i}", assignee="alpha") + for i in range(3): + kb.create_task(conn, title=f"b{i}", assignee="beta") + with kb.connect_closing() as conn: + res = kb.dispatch_once(conn, spawn_fn=_fake_spawn, dry_run=True) + assert len(res.spawned) == 8 + assert not res.skipped_per_profile_capped + + +def test_cap_2_balances_two_profiles(isolated_kanban_home_with_profiles): + """With cap=2: 2 alpha + 2 beta dispatched; remaining 3 alpha + 1 beta + deferred to skipped_per_profile_capped.""" + kb = isolated_kanban_home_with_profiles + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + for i in range(5): + kb.create_task(conn, title=f"a{i}", assignee="alpha") + for i in range(3): + kb.create_task(conn, title=f"b{i}", assignee="beta") + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=True, + max_in_progress_per_profile=2, + ) + spawn_assignees = [s[1] for s in res.spawned] + capped_assignees = [c[1] for c in res.skipped_per_profile_capped] + assert spawn_assignees.count("alpha") == 2 + assert spawn_assignees.count("beta") == 2 + assert capped_assignees.count("alpha") == 3 + assert capped_assignees.count("beta") == 1 + + +def test_pre_existing_running_counts_against_cap(isolated_kanban_home_with_profiles): + """A task already in 'running' status when dispatch_once starts counts + toward the per-profile cap. With 1 alpha pre-running and cap=1, NO new + alpha tasks should spawn; beta is independent so 1 beta spawns.""" + kb = isolated_kanban_home_with_profiles + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + running_alpha = kb.create_task(conn, title="running alpha", assignee="alpha") + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET status = 'running', claim_lock = 'test:1' WHERE id = ?", + (running_alpha,), + ) + for i in range(2): + kb.create_task(conn, title=f"a{i}", assignee="alpha") + for i in range(2): + kb.create_task(conn, title=f"b{i}", assignee="beta") + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=True, + max_in_progress_per_profile=1, + ) + spawn_assignees = [s[1] for s in res.spawned] + capped_assignees = [c[1] for c in res.skipped_per_profile_capped] + assert spawn_assignees.count("alpha") == 0 + assert spawn_assignees.count("beta") == 1 + assert capped_assignees.count("alpha") == 2 + assert capped_assignees.count("beta") == 1 + + +@pytest.mark.parametrize("cap", [0, -1, "abc", None]) +def test_invalid_cap_treated_as_no_cap(isolated_kanban_home_with_profiles, cap): + """Cap values that don't represent a positive int should be treated as + 'no cap' — silently falling through rather than crashing the dispatcher.""" + kb = isolated_kanban_home_with_profiles + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + for i in range(3): + kb.create_task(conn, title=f"a{i}", assignee="alpha") + with kb.connect_closing() as conn: + res = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=True, + max_in_progress_per_profile=cap, + ) + assert not res.skipped_per_profile_capped + assert len(res.spawned) == 3 + + +def test_capped_tasks_dispatched_on_subsequent_tick(isolated_kanban_home_with_profiles): + """A task deferred this tick because its profile was at cap should be + eligible for dispatch on the next tick (after running tasks complete). + This verifies the cap is per-tick state, not a permanent block.""" + kb = isolated_kanban_home_with_profiles + with kb.connect_closing() as conn: + kb.create_board(slug="default", name="Test") + ids = [kb.create_task(conn, title=f"a{i}", assignee="alpha") for i in range(3)] + + # First tick: cap=1, only 1 alpha dispatched + with kb.connect_closing() as conn: + res1 = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=False, + max_in_progress_per_profile=1, + ) + assert len(res1.spawned) == 1 + assert len(res1.skipped_per_profile_capped) == 2 + + # Simulate the running task completing — set it back to done so the + # 'running' count drops + spawned_id = res1.spawned[0][0] + with kb.connect_closing() as conn: + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET status = 'done', claim_lock = NULL WHERE id = ?", + (spawned_id,), + ) + + # Second tick: 1 more alpha should now dispatch + with kb.connect_closing() as conn: + res2 = kb.dispatch_once( + conn, spawn_fn=_fake_spawn, dry_run=False, + max_in_progress_per_profile=1, + ) + assert len(res2.spawned) == 1 + assert len(res2.skipped_per_profile_capped) == 1 + assert res2.spawned[0][0] != spawned_id # different task this time + + +def test_dispatch_result_has_skipped_per_profile_capped_field(): + """Schema-level invariant: DispatchResult exposes the + skipped_per_profile_capped field as a list of + (task_id, assignee, current_running) tuples.""" + from hermes_cli.kanban_db import DispatchResult + r = DispatchResult() + assert hasattr(r, "skipped_per_profile_capped") + assert r.skipped_per_profile_capped == []