fix(kanban): cap dispatch by running workers

This commit is contained in:
guglielmofonda 2026-05-07 13:35:21 -07:00 committed by Teknium
parent cede612987
commit 845be254ec
2 changed files with 60 additions and 1 deletions

View file

@ -3660,6 +3660,14 @@ def dispatch_once(
result.timed_out = enforce_max_runtime(conn)
result.promoted = recompute_ready(conn)
running_count = 0
if max_spawn is not None:
running_count = int(
conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status = 'running'"
).fetchone()[0]
)
ready_rows = conn.execute(
"SELECT id, assignee FROM tasks "
"WHERE status = 'ready' AND claim_lock IS NULL "
@ -3667,7 +3675,7 @@ def dispatch_once(
).fetchall()
spawned = 0
for row in ready_rows:
if max_spawn is not None and spawned >= max_spawn:
if max_spawn is not None and running_count + spawned >= max_spawn:
break
if not row["assignee"]:
result.skipped_unassigned.append(row["id"])

View file

@ -605,6 +605,57 @@ def test_dispatch_spawn_failure_releases_claim(kanban_home, all_assignees_spawna
assert kb.get_task(conn, t).claim_lock is None
def test_dispatch_max_spawn_counts_existing_running_tasks(
kanban_home, all_assignees_spawnable
):
"""max_spawn is a live concurrency cap, not a per-tick spawn cap.
Without counting tasks already in ``running``, every dispatcher tick can
launch up to ``max_spawn`` more workers while previous workers are still
alive. Long-running boards then accumulate unbounded worker subprocesses.
"""
spawns = []
def fake_spawn(task, workspace):
spawns.append(task.id)
with kb.connect() as conn:
running_a = kb.create_task(conn, title="running-a", assignee="alice")
running_b = kb.create_task(conn, title="running-b", assignee="bob")
ready = kb.create_task(conn, title="ready", assignee="carol")
kb.claim_task(conn, running_a)
kb.claim_task(conn, running_b)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
assert res.spawned == []
assert spawns == []
assert kb.get_task(conn, ready).status == "ready"
def test_dispatch_max_spawn_fills_remaining_capacity(
kanban_home, all_assignees_spawnable
):
"""When below cap, dispatch only fills available worker slots."""
spawns = []
def fake_spawn(task, workspace):
spawns.append(task.id)
with kb.connect() as conn:
running = kb.create_task(conn, title="running", assignee="alice")
ready_a = kb.create_task(conn, title="ready-a", assignee="bob")
ready_b = kb.create_task(conn, title="ready-b", assignee="carol")
kb.claim_task(conn, running)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
assert len(res.spawned) == 1
assert spawns == [ready_a]
assert kb.get_task(conn, ready_a).status == "running"
assert kb.get_task(conn, ready_b).status == "ready"
def test_dispatch_reclaims_stale_before_spawning(kanban_home):
with kb.connect() as conn:
t = kb.create_task(conn, title="x", assignee="alice")