mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-03 07:21:54 +00:00
Two related dispatcher behaviors that have been missing for a while. ## kanban.default_assignee (#27145) Reporter (@agarzon): dashboard creates a task without an assignee, task parks in 'ready' forever even though the operator's intent ('default') is perfectly clear. The dispatcher already had a 'skipped_unassigned' bucket but no fallback routing — users had to manually type 'default' in the assignee field every time. Behavior: when 'kanban.default_assignee' is set in config.yaml, the dispatcher applies that assignee to any unassigned ready task before deciding whether to spawn. The row is mutated (assignee column + an 'assigned' event with source='kanban.default_assignee' for the audit trail). Empty/whitespace config value = no fallback, preserving the existing skipped_unassigned behavior. Dry-run mode reports what WOULD happen via the new 'auto_assigned_default' bucket on DispatchResult, but does NOT mutate the DB — operators using 'hermes kanban dispatch --dry-run' see the routing decision before committing. ## kanban.max_in_progress_per_profile (#21582) Reporter (@edwardchenchen, @simlu, 4 reactions): fan-out workloads saturate one profile's local model / API quota / browser pool while other profiles sit idle. The existing global 'max_in_progress' caps total workers but doesn't balance across profiles. Behavior: when 'kanban.max_in_progress_per_profile' is set to a positive int, the dispatcher tracks per-assignee running counts (one query at tick start) and refuses to spawn for any assignee already at the cap. Tasks blocked this way go to a new 'skipped_per_profile_capped' bucket on DispatchResult as (task_id, assignee, current_running_count) tuples — NOT an operator-actionable failure, just 'try again next tick when the profile has capacity'. Pre-existing 'running' tasks count against the cap (verified via regression test). The cap respects dry_run mode by incrementing its in-memory counter on each would-be spawn so dry_run reports the same balanced subset that a real tick would. Invalid cap values (0, negative, non-int, None) are treated as 'no cap', preserving the existing behavior. Backward-compatible for installs that don't set the config. ## Surfaces - 'hermes kanban dispatch' CLI now prints 'Auto-assigned to kanban.default_assignee=X: ...' and 'Deferred (X at per-profile cap, N running): ...' lines, plus matching JSON keys in --json output. - Gateway dispatcher logs the configured values at startup ('default_assignee=X', 'max_in_progress_per_profile=N'). - 'kanban.max_in_progress_per_profile' added to DEFAULT_CONFIG with inline docs. ## Validation - tests/hermes_cli/test_kanban_default_assignee.py (6 cases): no-cap baseline, auto-assign + DB mutation, dry-run reports without mutating, whitespace treated as None, explicit assignees untouched, DispatchResult field schema. - tests/hermes_cli/test_kanban_per_profile_cap.py (9 cases including 4 parametrized): no-cap baseline, balanced 2-profile fan-out, pre-existing running counts against cap, invalid cap values (0/-1/'abc'/None), capped tasks dispatched on next tick after running task completes, DispatchResult field schema. - Broader kanban suite: 464/464 pass (was 449 baseline; +15 new regression tests across both features). ## Credit #27145 — Jimmy Johansson reported the dispatcher skipped-unassigned gap; @agarzon scoped the simpler 'honor kanban.default_assignee' fix that matches the existing config knob. #21582 — @edwardchenchen filed the per-profile cap ask after hitting model 429s on fan-out research projects; @simlu confirmed the same pain on local-model setups.
This commit is contained in:
parent
42612aa350
commit
3b6347af15
6 changed files with 539 additions and 5 deletions
|
|
@ -5420,6 +5420,49 @@ class GatewayRunner:
|
|||
)
|
||||
stale_timeout_seconds = 0
|
||||
|
||||
# Read kanban.default_assignee — fallback profile for tasks
|
||||
# created without an explicit assignee (e.g. via the dashboard).
|
||||
# When set, the dispatcher applies it to unassigned ready tasks
|
||||
# instead of skipping them indefinitely (#27145). Empty string
|
||||
# (the schema default) means "no fallback, keep skipping" —
|
||||
# backward-compatible with existing installs.
|
||||
default_assignee = (kanban_cfg.get("default_assignee") or "").strip() or None
|
||||
if default_assignee:
|
||||
logger.info(
|
||||
"kanban dispatcher: default_assignee=%r (unassigned ready tasks "
|
||||
"will route to this profile)",
|
||||
default_assignee,
|
||||
)
|
||||
|
||||
# Read kanban.max_in_progress_per_profile — per-profile concurrency
|
||||
# cap (#21582). When set, no single profile gets more than N
|
||||
# workers running at once, even if the global max_in_progress
|
||||
# would allow it. Prevents one profile's local model / API quota
|
||||
# / browser pool from being overwhelmed by a fan-out.
|
||||
raw_per_profile = kanban_cfg.get("max_in_progress_per_profile", None)
|
||||
max_in_progress_per_profile = None
|
||||
if raw_per_profile is not None:
|
||||
try:
|
||||
max_in_progress_per_profile = int(raw_per_profile)
|
||||
except (TypeError, ValueError):
|
||||
logger.warning(
|
||||
"kanban dispatcher: invalid kanban.max_in_progress_per_profile=%r; ignoring",
|
||||
raw_per_profile,
|
||||
)
|
||||
max_in_progress_per_profile = None
|
||||
else:
|
||||
if max_in_progress_per_profile < 1:
|
||||
logger.warning(
|
||||
"kanban dispatcher: kanban.max_in_progress_per_profile=%r is below 1; ignoring",
|
||||
raw_per_profile,
|
||||
)
|
||||
max_in_progress_per_profile = None
|
||||
else:
|
||||
logger.info(
|
||||
"kanban dispatcher: max_in_progress_per_profile=%d",
|
||||
max_in_progress_per_profile,
|
||||
)
|
||||
|
||||
# Initial delay so the gateway finishes wiring adapters before the
|
||||
# dispatcher spawns workers (those workers may hit gateway notify
|
||||
# subscriptions etc.). Matches the notifier watcher's delay.
|
||||
|
|
@ -5511,6 +5554,8 @@ class GatewayRunner:
|
|||
max_in_progress=max_in_progress,
|
||||
failure_limit=failure_limit,
|
||||
stale_timeout_seconds=stale_timeout_seconds,
|
||||
default_assignee=default_assignee,
|
||||
max_in_progress_per_profile=max_in_progress_per_profile,
|
||||
)
|
||||
except sqlite3.DatabaseError as exc:
|
||||
if _is_corrupt_board_db_error(exc):
|
||||
|
|
|
|||
|
|
@ -1726,6 +1726,15 @@ DEFAULT_CONFIG = {
|
|||
# assignee to any installed profile. When unset, falls back to the
|
||||
# default profile. A task never ends up with assignee=None.
|
||||
"default_assignee": "",
|
||||
# Per-profile concurrency cap (#21582). When set to a positive int,
|
||||
# no single profile can have more than N workers running at once,
|
||||
# even if the global max_in_progress / max_spawn caps would allow
|
||||
# it. Tasks blocked this way defer to the next dispatcher tick.
|
||||
# Unset (None) means "no per-profile cap" — backward-compatible
|
||||
# with existing installs. Useful for fan-out workflows that would
|
||||
# otherwise saturate one profile's local model / API quota /
|
||||
# browser pool while leaving other profiles idle.
|
||||
"max_in_progress_per_profile": None,
|
||||
# When true, the kanban dispatcher auto-runs the decomposer on
|
||||
# tasks that land in Triage (every dispatcher tick). When false,
|
||||
# decomposition is manual via `hermes kanban decompose <id>` or
|
||||
|
|
|
|||
|
|
@ -2087,12 +2087,35 @@ def _cmd_tail(args: argparse.Namespace) -> int:
|
|||
|
||||
|
||||
def _cmd_dispatch(args: argparse.Namespace) -> int:
|
||||
# Honour kanban.default_assignee as the fallback for unassigned ready
|
||||
# tasks (#27145) and kanban.max_in_progress_per_profile as the
|
||||
# per-profile concurrency cap (#21582). Same semantics as the
|
||||
# gateway dispatch path.
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
_cfg = load_config()
|
||||
_kanban_cfg = _cfg.get("kanban", {}) if isinstance(_cfg, dict) else {}
|
||||
default_assignee = (_kanban_cfg.get("default_assignee") or "").strip() or None
|
||||
_raw_per_profile = _kanban_cfg.get("max_in_progress_per_profile", None)
|
||||
try:
|
||||
max_in_progress_per_profile = (
|
||||
int(_raw_per_profile) if _raw_per_profile is not None else None
|
||||
)
|
||||
if max_in_progress_per_profile is not None and max_in_progress_per_profile < 1:
|
||||
max_in_progress_per_profile = None
|
||||
except (TypeError, ValueError):
|
||||
max_in_progress_per_profile = None
|
||||
except Exception:
|
||||
default_assignee = None
|
||||
max_in_progress_per_profile = None
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn,
|
||||
dry_run=args.dry_run,
|
||||
max_spawn=args.max,
|
||||
failure_limit=getattr(args, "failure_limit", kb.DEFAULT_SPAWN_FAILURE_LIMIT),
|
||||
default_assignee=default_assignee,
|
||||
max_in_progress_per_profile=max_in_progress_per_profile,
|
||||
)
|
||||
if getattr(args, "json", False):
|
||||
print(json.dumps({
|
||||
|
|
@ -2108,6 +2131,11 @@ def _cmd_dispatch(args: argparse.Namespace) -> int:
|
|||
],
|
||||
"skipped_unassigned": res.skipped_unassigned,
|
||||
"skipped_nonspawnable": res.skipped_nonspawnable,
|
||||
"skipped_per_profile_capped": [
|
||||
{"task_id": tid, "assignee": who, "current": current}
|
||||
for (tid, who, current) in res.skipped_per_profile_capped
|
||||
],
|
||||
"auto_assigned_default": res.auto_assigned_default,
|
||||
}, indent=2))
|
||||
return 0
|
||||
print(f"Reclaimed: {res.reclaimed}")
|
||||
|
|
@ -2128,8 +2156,18 @@ def _cmd_dispatch(args: argparse.Namespace) -> int:
|
|||
for tid, who, ws in res.spawned:
|
||||
tag = " (dry)" if args.dry_run else ""
|
||||
print(f" - {tid} -> {who} @ {ws or '-'}{tag}")
|
||||
if res.auto_assigned_default:
|
||||
print(
|
||||
f"Auto-assigned to kanban.default_assignee={default_assignee!r}: "
|
||||
f"{', '.join(res.auto_assigned_default)}"
|
||||
)
|
||||
if res.skipped_unassigned:
|
||||
print(f"Skipped (unassigned): {', '.join(res.skipped_unassigned)}")
|
||||
if res.skipped_per_profile_capped:
|
||||
for tid, who, current in res.skipped_per_profile_capped:
|
||||
print(
|
||||
f"Deferred ({who} at per-profile cap, {current} running): {tid}"
|
||||
)
|
||||
if res.skipped_nonspawnable:
|
||||
print(
|
||||
f"Skipped (non-spawnable assignee — terminal lane, OK): "
|
||||
|
|
|
|||
|
|
@ -4289,6 +4289,12 @@ class DispatchResult:
|
|||
skipped_unassigned: list[str] = field(default_factory=list)
|
||||
"""Ready task ids skipped because they have no assignee at all.
|
||||
Operator-actionable — usually a misfiled task waiting for routing."""
|
||||
auto_assigned_default: list[str] = field(default_factory=list)
|
||||
"""Task ids that were unassigned in the DB and had
|
||||
``kanban.default_assignee`` applied this tick before spawning (#27145).
|
||||
Surfaces the auto-assignment to telemetry / CLI / dashboard so the
|
||||
operator can see when the dispatcher is acting on the fallback rule
|
||||
rather than on explicit per-task assignments."""
|
||||
skipped_nonspawnable: list[str] = field(default_factory=list)
|
||||
"""Ready task ids skipped because their assignee names a control-plane
|
||||
lane (a Claude Code terminal like ``orion-cc``) rather than a Hermes
|
||||
|
|
@ -4296,6 +4302,14 @@ class DispatchResult:
|
|||
operator-actionable failure. Tracked separately so health telemetry
|
||||
can distinguish "real stuck" (nothing spawned but spawnable work
|
||||
available) from "correctly idle" (nothing spawnable in the queue)."""
|
||||
skipped_per_profile_capped: list[tuple[str, str, int]] = field(default_factory=list)
|
||||
"""Tasks deferred this tick because their assignee is already at
|
||||
``kanban.max_in_progress_per_profile`` (#21582). Each entry is
|
||||
``(task_id, assignee, current_running_count)``. NOT an
|
||||
operator-actionable failure — the task will be picked up on a
|
||||
subsequent tick when the assignee has capacity. Separate bucket so
|
||||
telemetry / dashboards can show "this profile is busy" vs
|
||||
"task is genuinely stuck"."""
|
||||
crashed: list[str] = field(default_factory=list)
|
||||
"""Task ids reclaimed because their worker PID disappeared."""
|
||||
auto_blocked: list[str] = field(default_factory=list)
|
||||
|
|
@ -5342,6 +5356,8 @@ def dispatch_once(
|
|||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||||
stale_timeout_seconds: int = 0,
|
||||
board: Optional[str] = None,
|
||||
default_assignee: Optional[str] = None,
|
||||
max_in_progress_per_profile: Optional[int] = None,
|
||||
) -> DispatchResult:
|
||||
"""Run one dispatcher tick.
|
||||
|
||||
|
|
@ -5427,12 +5443,89 @@ def dispatch_once(
|
|||
if max_spawn is None or max_spawn > remaining:
|
||||
max_spawn = remaining
|
||||
spawned = 0
|
||||
# Per-profile concurrency cap (#21582): when set, track how many
|
||||
# workers each assignee already has in flight, and refuse to spawn
|
||||
# when this would push that assignee past the cap. Prevents
|
||||
# fan-out workloads from melting a single profile's local model /
|
||||
# API quota / browser pool while leaving other profiles idle.
|
||||
# Tasks blocked this way go to skipped_per_profile_capped (not
|
||||
# skipped_unassigned — the operator-actionable signal is different:
|
||||
# "this profile is busy, try again later" not "this needs routing").
|
||||
_per_profile_cap = max_in_progress_per_profile if (
|
||||
isinstance(max_in_progress_per_profile, int)
|
||||
and max_in_progress_per_profile > 0
|
||||
) else None
|
||||
_per_profile_running: dict[str, int] = {}
|
||||
if _per_profile_cap is not None:
|
||||
for prow in conn.execute(
|
||||
"SELECT assignee, COUNT(*) AS n FROM tasks "
|
||||
"WHERE status = 'running' AND assignee IS NOT NULL "
|
||||
"GROUP BY assignee"
|
||||
):
|
||||
_per_profile_running[prow["assignee"]] = int(prow["n"])
|
||||
# Normalize default_assignee once: empty/whitespace string → None so the
|
||||
# rest of the loop can use ``if default_assignee:`` as a single check.
|
||||
# We also resolve profile_exists once here for the same reason.
|
||||
_default_assignee = (default_assignee or "").strip() or None
|
||||
_default_assignee_resolved = False
|
||||
if _default_assignee:
|
||||
try:
|
||||
from hermes_cli.profiles import profile_exists as _pe
|
||||
_default_assignee_resolved = bool(_pe(_default_assignee))
|
||||
except Exception:
|
||||
# Profiles module not importable (test stubs, exotic envs).
|
||||
# Trust the operator's config and try the assignment; the
|
||||
# downstream profile_exists check on the assigned row will
|
||||
# bucket it as nonspawnable if the profile genuinely isn't
|
||||
# there, with the existing diagnostic.
|
||||
_default_assignee_resolved = True
|
||||
for row in ready_rows:
|
||||
if max_spawn is not None and running_count + spawned >= max_spawn:
|
||||
break
|
||||
if not row["assignee"]:
|
||||
result.skipped_unassigned.append(row["id"])
|
||||
continue
|
||||
row_assignee = row["assignee"]
|
||||
if not row_assignee:
|
||||
# Honour kanban.default_assignee: when the dispatcher hits an
|
||||
# unassigned ready task and an operator-configured fallback
|
||||
# exists, persist the assignment and proceed. This removes the
|
||||
# dashboard footgun where a task created without an assignee
|
||||
# parks in 'ready' forever even though the operator's intent
|
||||
# ("default") was perfectly clear (#27145). Mutating the row
|
||||
# (not just the in-memory view) keeps diagnostics and the
|
||||
# board state consistent: the task is now legitimately owned
|
||||
# by ``kanban.default_assignee``, not "unassigned but secretly
|
||||
# routed".
|
||||
if _default_assignee and _default_assignee_resolved:
|
||||
# Dry-run: show what WOULD happen (auto-assign + spawn) without
|
||||
# mutating the DB. Real run: mutate the row + emit the
|
||||
# 'assigned' event so the board state matches what just happened.
|
||||
if not dry_run:
|
||||
try:
|
||||
with write_txn(conn):
|
||||
conn.execute(
|
||||
"UPDATE tasks SET assignee = ? WHERE id = ? "
|
||||
"AND (assignee IS NULL OR assignee = '')",
|
||||
(_default_assignee, row["id"]),
|
||||
)
|
||||
_append_event(
|
||||
conn, row["id"], "assigned",
|
||||
{
|
||||
"assignee": _default_assignee,
|
||||
"source": "kanban.default_assignee",
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
_log.debug(
|
||||
"kanban dispatch: failed to apply default_assignee=%r "
|
||||
"to task %s",
|
||||
_default_assignee, row["id"], exc_info=True,
|
||||
)
|
||||
result.skipped_unassigned.append(row["id"])
|
||||
continue
|
||||
row_assignee = _default_assignee
|
||||
result.auto_assigned_default.append(row["id"])
|
||||
else:
|
||||
result.skipped_unassigned.append(row["id"])
|
||||
continue
|
||||
# Skip ready tasks whose assignee is not a real Hermes profile.
|
||||
# `_default_spawn` invokes ``hermes -p <assignee>`` which fails
|
||||
# with "Profile 'X' does not exist" when the assignee names a
|
||||
|
|
@ -5447,7 +5540,7 @@ def dispatch_once(
|
|||
from hermes_cli.profiles import profile_exists # local import: avoids cycle
|
||||
except Exception:
|
||||
profile_exists = None # type: ignore[assignment]
|
||||
if profile_exists is not None and not profile_exists(row["assignee"]):
|
||||
if profile_exists is not None and not profile_exists(row_assignee):
|
||||
# Bucket separately from skipped_unassigned: the operator
|
||||
# cannot fix this by assigning a profile (the assignee IS the
|
||||
# intended owner — a terminal lane). Health telemetry uses
|
||||
|
|
@ -5456,6 +5549,19 @@ def dispatch_once(
|
|||
# of human-pulled work.
|
||||
result.skipped_nonspawnable.append(row["id"])
|
||||
continue
|
||||
# Per-profile concurrency cap (#21582): even if there's global
|
||||
# headroom, refuse to spawn for an assignee that's already at
|
||||
# its in-flight cap. Prevents one profile's local model / API
|
||||
# quota / browser pool from being overwhelmed by a fan-out
|
||||
# while the global max_in_progress / max_spawn caps still allow
|
||||
# work on OTHER profiles.
|
||||
if _per_profile_cap is not None:
|
||||
current = _per_profile_running.get(row_assignee, 0)
|
||||
if current >= _per_profile_cap:
|
||||
result.skipped_per_profile_capped.append(
|
||||
(row["id"], row_assignee, current)
|
||||
)
|
||||
continue
|
||||
# Respawn guard: refuse to re-spawn when useful work is already
|
||||
# in-flight/recent, or when the last failure is a deterministic
|
||||
# blocker (quota / auth). The guard defers the spawn this tick so
|
||||
|
|
@ -5478,7 +5584,15 @@ def dispatch_once(
|
|||
)
|
||||
continue
|
||||
if dry_run:
|
||||
result.spawned.append((row["id"], row["assignee"], ""))
|
||||
result.spawned.append((row["id"], row_assignee, ""))
|
||||
# Increment per-profile counter even in dry_run so the cap
|
||||
# check sees the would-be spawn on subsequent iterations.
|
||||
# Without this, dry_run reports every task as spawnable and
|
||||
# under-reports the capped subset (#21582).
|
||||
if _per_profile_cap is not None and row_assignee:
|
||||
_per_profile_running[row_assignee] = (
|
||||
_per_profile_running.get(row_assignee, 0) + 1
|
||||
)
|
||||
continue
|
||||
claimed = claim_task(conn, row["id"], ttl_seconds=ttl_seconds)
|
||||
if claimed is None:
|
||||
|
|
@ -5521,6 +5635,13 @@ def dispatch_once(
|
|||
# complete_task).
|
||||
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
|
||||
spawned += 1
|
||||
# Track the new in-flight count for this profile so later
|
||||
# iterations in this same tick respect the per-profile cap
|
||||
# (#21582). Subsequent ticks re-query from the DB.
|
||||
if _per_profile_cap is not None and claimed.assignee:
|
||||
_per_profile_running[claimed.assignee] = (
|
||||
_per_profile_running.get(claimed.assignee, 0) + 1
|
||||
)
|
||||
except Exception as exc:
|
||||
auto = _record_spawn_failure(
|
||||
conn, claimed.id, str(exc),
|
||||
|
|
|
|||
154
tests/hermes_cli/test_kanban_default_assignee.py
Normal file
154
tests/hermes_cli/test_kanban_default_assignee.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
"""Regression tests for #27145 — kanban.default_assignee for unassigned ready tasks.
|
||||
|
||||
When the dispatcher hits an unassigned ready task and ``kanban.default_assignee``
|
||||
is set, the dispatcher applies the assignment and spawns. Without the config,
|
||||
the task is skipped (existing behavior preserved).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def isolated_kanban_home(monkeypatch):
|
||||
"""Spin up a fresh HERMES_HOME with a clean kanban DB."""
|
||||
test_home = tempfile.mkdtemp(prefix="kanban_default_assignee_test_")
|
||||
monkeypatch.setenv("HERMES_HOME", test_home)
|
||||
# Force-reimport so the fresh HERMES_HOME is picked up.
|
||||
for mod in list(sys.modules.keys()):
|
||||
if mod.startswith("hermes_cli") or mod.startswith("hermes_state") or mod == "hermes_constants":
|
||||
del sys.modules[mod]
|
||||
from hermes_cli import kanban_db
|
||||
yield kanban_db, test_home
|
||||
# Cleanup is best-effort; tempfile dir survives but pytest isolation
|
||||
# gives each test its own monkeypatched HERMES_HOME so no cross-test
|
||||
# contamination.
|
||||
|
||||
|
||||
def _fake_spawn(*args, **kwargs):
|
||||
"""Stand-in for the real worker spawn — returns a fake PID."""
|
||||
return 12345
|
||||
|
||||
|
||||
def test_unassigned_task_skipped_without_default_assignee(isolated_kanban_home):
|
||||
"""Baseline: with no default_assignee, an unassigned ready task is
|
||||
skipped via the existing `skipped_unassigned` bucket and the DB row
|
||||
is untouched."""
|
||||
kb, _home = isolated_kanban_home
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
task_id = kb.create_task(conn, title="t1", assignee=None)
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(conn, spawn_fn=_fake_spawn, dry_run=False)
|
||||
assert res.skipped_unassigned == [task_id]
|
||||
assert not res.auto_assigned_default
|
||||
assert not res.spawned
|
||||
with kb.connect_closing() as conn:
|
||||
row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
assert row["assignee"] is None
|
||||
|
||||
|
||||
def test_unassigned_task_auto_assigned_with_default_assignee(isolated_kanban_home):
|
||||
"""Core #27145 contract: with default_assignee set, an unassigned ready
|
||||
task gets the assignment applied and dispatched on the same tick. The
|
||||
DB row is mutated (assignee column + an 'assigned' event)."""
|
||||
kb, _home = isolated_kanban_home
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
task_id = kb.create_task(conn, title="t1", assignee=None)
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=False,
|
||||
default_assignee="default",
|
||||
)
|
||||
assert res.auto_assigned_default == [task_id]
|
||||
assert not res.skipped_unassigned
|
||||
assert len(res.spawned) == 1
|
||||
assert res.spawned[0][0] == task_id
|
||||
assert res.spawned[0][1] == "default"
|
||||
|
||||
with kb.connect_closing() as conn:
|
||||
row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
assert row["assignee"] == "default"
|
||||
|
||||
# 'assigned' event emitted for the audit trail
|
||||
with kb.connect_closing() as conn:
|
||||
evs = list(conn.execute(
|
||||
"SELECT kind, payload FROM task_events WHERE task_id = ? AND kind = 'assigned'",
|
||||
(task_id,),
|
||||
))
|
||||
assert len(evs) == 1
|
||||
payload = json.loads(evs[0][1])
|
||||
assert payload["assignee"] == "default"
|
||||
assert payload["source"] == "kanban.default_assignee"
|
||||
|
||||
|
||||
def test_dry_run_with_default_assignee_reports_without_mutating(isolated_kanban_home):
|
||||
"""Dry-run mode: reports what WOULD happen (task in auto_assigned_default,
|
||||
spawn entry) but does NOT mutate the DB. Operators using
|
||||
`hermes kanban dispatch --dry-run` see the routing decision before
|
||||
committing."""
|
||||
kb, _home = isolated_kanban_home
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
task_id = kb.create_task(conn, title="t1", assignee=None)
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=True,
|
||||
default_assignee="default",
|
||||
)
|
||||
assert res.auto_assigned_default == [task_id]
|
||||
assert len(res.spawned) == 1
|
||||
with kb.connect_closing() as conn:
|
||||
row = conn.execute("SELECT assignee FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
# DB unchanged — dry_run did not commit the assignment.
|
||||
assert row["assignee"] is None
|
||||
|
||||
|
||||
def test_whitespace_default_assignee_treated_as_none(isolated_kanban_home):
|
||||
"""Empty / whitespace-only default_assignee values must be treated as
|
||||
'no fallback set' so a misconfigured kanban.default_assignee=' '
|
||||
doesn't surprise operators by silently routing unassigned tasks."""
|
||||
kb, _home = isolated_kanban_home
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
task_id = kb.create_task(conn, title="t1", assignee=None)
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=False,
|
||||
default_assignee=" ",
|
||||
)
|
||||
assert task_id in res.skipped_unassigned
|
||||
assert not res.auto_assigned_default
|
||||
|
||||
|
||||
def test_explicitly_assigned_task_untouched_by_default_assignee(isolated_kanban_home):
|
||||
"""A task with an explicit assignee must NOT be touched by the
|
||||
default_assignee logic — that fallback only applies to genuinely
|
||||
unassigned rows."""
|
||||
kb, _home = isolated_kanban_home
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
task_id = kb.create_task(conn, title="t1", assignee="default")
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=False,
|
||||
default_assignee="someother",
|
||||
)
|
||||
assert task_id not in res.auto_assigned_default
|
||||
assert any(s[0] == task_id and s[1] == "default" for s in res.spawned)
|
||||
|
||||
|
||||
def test_dispatch_result_has_auto_assigned_default_field():
|
||||
"""Schema-level invariant: DispatchResult exposes the
|
||||
auto_assigned_default field so CLI / dashboard / gateway can surface
|
||||
the new routing decisions."""
|
||||
from hermes_cli.kanban_db import DispatchResult
|
||||
r = DispatchResult()
|
||||
assert hasattr(r, "auto_assigned_default")
|
||||
assert r.auto_assigned_default == []
|
||||
167
tests/hermes_cli/test_kanban_per_profile_cap.py
Normal file
167
tests/hermes_cli/test_kanban_per_profile_cap.py
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
"""Regression tests for #21582 — per-profile concurrency cap in dispatcher.
|
||||
|
||||
When ``kanban.max_in_progress_per_profile`` is set, no single profile
|
||||
gets more than N workers running at once even if the global
|
||||
``max_in_progress`` cap would allow it. Prevents one profile's local
|
||||
model / API quota / browser pool from being overwhelmed by a fan-out.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def isolated_kanban_home_with_profiles(monkeypatch):
|
||||
"""Spin up a fresh HERMES_HOME with kanban DB + alpha/beta profiles."""
|
||||
test_home = tempfile.mkdtemp(prefix="kanban_per_profile_cap_test_")
|
||||
for prof in ("alpha", "beta", "default"):
|
||||
os.makedirs(os.path.join(test_home, "profiles", prof), exist_ok=True)
|
||||
monkeypatch.setenv("HERMES_HOME", test_home)
|
||||
for mod in list(sys.modules.keys()):
|
||||
if mod.startswith("hermes_cli") or mod.startswith("hermes_state") or mod == "hermes_constants":
|
||||
del sys.modules[mod]
|
||||
from hermes_cli import kanban_db
|
||||
yield kanban_db
|
||||
|
||||
|
||||
def _fake_spawn(*args, **kwargs):
|
||||
return 12345
|
||||
|
||||
|
||||
def test_no_cap_all_tasks_dispatched(isolated_kanban_home_with_profiles):
|
||||
"""Baseline: with no per-profile cap, all ready tasks dispatch."""
|
||||
kb = isolated_kanban_home_with_profiles
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
for i in range(5):
|
||||
kb.create_task(conn, title=f"a{i}", assignee="alpha")
|
||||
for i in range(3):
|
||||
kb.create_task(conn, title=f"b{i}", assignee="beta")
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(conn, spawn_fn=_fake_spawn, dry_run=True)
|
||||
assert len(res.spawned) == 8
|
||||
assert not res.skipped_per_profile_capped
|
||||
|
||||
|
||||
def test_cap_2_balances_two_profiles(isolated_kanban_home_with_profiles):
|
||||
"""With cap=2: 2 alpha + 2 beta dispatched; remaining 3 alpha + 1 beta
|
||||
deferred to skipped_per_profile_capped."""
|
||||
kb = isolated_kanban_home_with_profiles
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
for i in range(5):
|
||||
kb.create_task(conn, title=f"a{i}", assignee="alpha")
|
||||
for i in range(3):
|
||||
kb.create_task(conn, title=f"b{i}", assignee="beta")
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=True,
|
||||
max_in_progress_per_profile=2,
|
||||
)
|
||||
spawn_assignees = [s[1] for s in res.spawned]
|
||||
capped_assignees = [c[1] for c in res.skipped_per_profile_capped]
|
||||
assert spawn_assignees.count("alpha") == 2
|
||||
assert spawn_assignees.count("beta") == 2
|
||||
assert capped_assignees.count("alpha") == 3
|
||||
assert capped_assignees.count("beta") == 1
|
||||
|
||||
|
||||
def test_pre_existing_running_counts_against_cap(isolated_kanban_home_with_profiles):
|
||||
"""A task already in 'running' status when dispatch_once starts counts
|
||||
toward the per-profile cap. With 1 alpha pre-running and cap=1, NO new
|
||||
alpha tasks should spawn; beta is independent so 1 beta spawns."""
|
||||
kb = isolated_kanban_home_with_profiles
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
running_alpha = kb.create_task(conn, title="running alpha", assignee="alpha")
|
||||
with kb.write_txn(conn):
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = 'running', claim_lock = 'test:1' WHERE id = ?",
|
||||
(running_alpha,),
|
||||
)
|
||||
for i in range(2):
|
||||
kb.create_task(conn, title=f"a{i}", assignee="alpha")
|
||||
for i in range(2):
|
||||
kb.create_task(conn, title=f"b{i}", assignee="beta")
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=True,
|
||||
max_in_progress_per_profile=1,
|
||||
)
|
||||
spawn_assignees = [s[1] for s in res.spawned]
|
||||
capped_assignees = [c[1] for c in res.skipped_per_profile_capped]
|
||||
assert spawn_assignees.count("alpha") == 0
|
||||
assert spawn_assignees.count("beta") == 1
|
||||
assert capped_assignees.count("alpha") == 2
|
||||
assert capped_assignees.count("beta") == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("cap", [0, -1, "abc", None])
|
||||
def test_invalid_cap_treated_as_no_cap(isolated_kanban_home_with_profiles, cap):
|
||||
"""Cap values that don't represent a positive int should be treated as
|
||||
'no cap' — silently falling through rather than crashing the dispatcher."""
|
||||
kb = isolated_kanban_home_with_profiles
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
for i in range(3):
|
||||
kb.create_task(conn, title=f"a{i}", assignee="alpha")
|
||||
with kb.connect_closing() as conn:
|
||||
res = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=True,
|
||||
max_in_progress_per_profile=cap,
|
||||
)
|
||||
assert not res.skipped_per_profile_capped
|
||||
assert len(res.spawned) == 3
|
||||
|
||||
|
||||
def test_capped_tasks_dispatched_on_subsequent_tick(isolated_kanban_home_with_profiles):
|
||||
"""A task deferred this tick because its profile was at cap should be
|
||||
eligible for dispatch on the next tick (after running tasks complete).
|
||||
This verifies the cap is per-tick state, not a permanent block."""
|
||||
kb = isolated_kanban_home_with_profiles
|
||||
with kb.connect_closing() as conn:
|
||||
kb.create_board(slug="default", name="Test")
|
||||
ids = [kb.create_task(conn, title=f"a{i}", assignee="alpha") for i in range(3)]
|
||||
|
||||
# First tick: cap=1, only 1 alpha dispatched
|
||||
with kb.connect_closing() as conn:
|
||||
res1 = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=False,
|
||||
max_in_progress_per_profile=1,
|
||||
)
|
||||
assert len(res1.spawned) == 1
|
||||
assert len(res1.skipped_per_profile_capped) == 2
|
||||
|
||||
# Simulate the running task completing — set it back to done so the
|
||||
# 'running' count drops
|
||||
spawned_id = res1.spawned[0][0]
|
||||
with kb.connect_closing() as conn:
|
||||
with kb.write_txn(conn):
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = 'done', claim_lock = NULL WHERE id = ?",
|
||||
(spawned_id,),
|
||||
)
|
||||
|
||||
# Second tick: 1 more alpha should now dispatch
|
||||
with kb.connect_closing() as conn:
|
||||
res2 = kb.dispatch_once(
|
||||
conn, spawn_fn=_fake_spawn, dry_run=False,
|
||||
max_in_progress_per_profile=1,
|
||||
)
|
||||
assert len(res2.spawned) == 1
|
||||
assert len(res2.skipped_per_profile_capped) == 1
|
||||
assert res2.spawned[0][0] != spawned_id # different task this time
|
||||
|
||||
|
||||
def test_dispatch_result_has_skipped_per_profile_capped_field():
|
||||
"""Schema-level invariant: DispatchResult exposes the
|
||||
skipped_per_profile_capped field as a list of
|
||||
(task_id, assignee, current_running) tuples."""
|
||||
from hermes_cli.kanban_db import DispatchResult
|
||||
r = DispatchResult()
|
||||
assert hasattr(r, "skipped_per_profile_capped")
|
||||
assert r.skipped_per_profile_capped == []
|
||||
Loading…
Add table
Add a link
Reference in a new issue