From 845be254ec4b14b9c71f7505eb393d85c665882e Mon Sep 17 00:00:00 2001 From: guglielmofonda Date: Thu, 7 May 2026 13:35:21 -0700 Subject: [PATCH] fix(kanban): cap dispatch by running workers --- hermes_cli/kanban_db.py | 10 +++++- tests/hermes_cli/test_kanban_db.py | 51 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 86b399a9671..967db790efa 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -3660,6 +3660,14 @@ def dispatch_once( result.timed_out = enforce_max_runtime(conn) result.promoted = recompute_ready(conn) + running_count = 0 + if max_spawn is not None: + running_count = int( + conn.execute( + "SELECT COUNT(*) FROM tasks WHERE status = 'running'" + ).fetchone()[0] + ) + ready_rows = conn.execute( "SELECT id, assignee FROM tasks " "WHERE status = 'ready' AND claim_lock IS NULL " @@ -3667,7 +3675,7 @@ def dispatch_once( ).fetchall() spawned = 0 for row in ready_rows: - if max_spawn is not None and spawned >= max_spawn: + if max_spawn is not None and running_count + spawned >= max_spawn: break if not row["assignee"]: result.skipped_unassigned.append(row["id"]) diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index c75b02d7ebf..a00f0b2d588 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -605,6 +605,57 @@ def test_dispatch_spawn_failure_releases_claim(kanban_home, all_assignees_spawna assert kb.get_task(conn, t).claim_lock is None +def test_dispatch_max_spawn_counts_existing_running_tasks( + kanban_home, all_assignees_spawnable +): + """max_spawn is a live concurrency cap, not a per-tick spawn cap. + + Without counting tasks already in ``running``, every dispatcher tick can + launch up to ``max_spawn`` more workers while previous workers are still + alive. Long-running boards then accumulate unbounded worker subprocesses. + """ + spawns = [] + + def fake_spawn(task, workspace): + spawns.append(task.id) + + with kb.connect() as conn: + running_a = kb.create_task(conn, title="running-a", assignee="alice") + running_b = kb.create_task(conn, title="running-b", assignee="bob") + ready = kb.create_task(conn, title="ready", assignee="carol") + kb.claim_task(conn, running_a) + kb.claim_task(conn, running_b) + + res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2) + + assert res.spawned == [] + assert spawns == [] + assert kb.get_task(conn, ready).status == "ready" + + +def test_dispatch_max_spawn_fills_remaining_capacity( + kanban_home, all_assignees_spawnable +): + """When below cap, dispatch only fills available worker slots.""" + spawns = [] + + def fake_spawn(task, workspace): + spawns.append(task.id) + + with kb.connect() as conn: + running = kb.create_task(conn, title="running", assignee="alice") + ready_a = kb.create_task(conn, title="ready-a", assignee="bob") + ready_b = kb.create_task(conn, title="ready-b", assignee="carol") + kb.claim_task(conn, running) + + res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2) + + assert len(res.spawned) == 1 + assert spawns == [ready_a] + assert kb.get_task(conn, ready_a).status == "running" + assert kb.get_task(conn, ready_b).status == "ready" + + def test_dispatch_reclaims_stale_before_spawning(kanban_home): with kb.connect() as conn: t = kb.create_task(conn, title="x", assignee="alice")