mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
feat(kanban): wire dispatcher to dispatch review agents from review column
Salvages #23772 by @thewillhuang. Adds 'review' as a valid kanban task status and extends dispatch_once to monitor the review column as a second dispatch source (in addition to the existing ready column). - Adds 'review' to VALID_STATUSES - Adds claim_review_task() — atomically transitions review → running - Adds has_spawnable_review() — health telemetry mirror - Extends dispatch_once with a review column dispatch loop - Review agents get 'sdlc-review' skill auto-loaded Resolved 2 conflicts (VALID_STATUSES merge with main's 'scheduled' state, test file additions). Adapted claim_review_task to main's ttl_seconds: Optional[int] = None convention (matches claim_task).
This commit is contained in:
parent
31fe229039
commit
f55d94a1e0
3 changed files with 354 additions and 1 deletions
|
|
@ -4954,6 +4954,8 @@ class GatewayRunner:
|
||||||
conn = _kb.connect(board=slug)
|
conn = _kb.connect(board=slug)
|
||||||
if _kb.has_spawnable_ready(conn):
|
if _kb.has_spawnable_ready(conn):
|
||||||
return True
|
return True
|
||||||
|
if _kb.has_spawnable_review(conn):
|
||||||
|
return True
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
finally:
|
finally:
|
||||||
|
|
|
||||||
|
|
@ -94,7 +94,7 @@ _log = logging.getLogger(__name__)
|
||||||
# Constants
|
# Constants
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "done", "archived"}
|
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "review", "done", "archived"}
|
||||||
VALID_INITIAL_STATUSES = {"running", "blocked"}
|
VALID_INITIAL_STATUSES = {"running", "blocked"}
|
||||||
VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
|
VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
|
||||||
KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names())
|
KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names())
|
||||||
|
|
@ -2132,6 +2132,81 @@ def claim_task(
|
||||||
return get_task(conn, task_id)
|
return get_task(conn, task_id)
|
||||||
|
|
||||||
|
|
||||||
|
def claim_review_task(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
task_id: str,
|
||||||
|
*,
|
||||||
|
ttl_seconds: Optional[int] = None,
|
||||||
|
claimer: Optional[str] = None,
|
||||||
|
) -> Optional[Task]:
|
||||||
|
"""Atomically transition ``review -> running``.
|
||||||
|
|
||||||
|
Returns the claimed ``Task`` on success, ``None`` if the task was
|
||||||
|
already claimed (or is not in ``review`` status).
|
||||||
|
|
||||||
|
Unlike ``claim_task`` (which handles ``ready -> running``), this
|
||||||
|
does NOT check parent dependencies — the task already passed that
|
||||||
|
gate on its original ``todo -> ready -> running`` transition.
|
||||||
|
|
||||||
|
Creates a new run entry so the review agent's lifecycle is tracked
|
||||||
|
independently from the original worker run.
|
||||||
|
"""
|
||||||
|
now = int(time.time())
|
||||||
|
lock = claimer or _claimer_id()
|
||||||
|
expires = now + _resolve_claim_ttl_seconds(ttl_seconds)
|
||||||
|
with write_txn(conn):
|
||||||
|
cur = conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE tasks
|
||||||
|
SET status = 'running',
|
||||||
|
claim_lock = ?,
|
||||||
|
claim_expires = ?,
|
||||||
|
started_at = COALESCE(started_at, ?)
|
||||||
|
WHERE id = ?
|
||||||
|
AND status = 'review'
|
||||||
|
AND claim_lock IS NULL
|
||||||
|
""",
|
||||||
|
(lock, expires, now, task_id),
|
||||||
|
)
|
||||||
|
if cur.rowcount != 1:
|
||||||
|
return None
|
||||||
|
trow = conn.execute(
|
||||||
|
"SELECT assignee, max_runtime_seconds, current_step_key "
|
||||||
|
"FROM tasks WHERE id = ?",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()
|
||||||
|
run_cur = conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO task_runs (
|
||||||
|
task_id, profile, step_key, status,
|
||||||
|
claim_lock, claim_expires, max_runtime_seconds,
|
||||||
|
started_at
|
||||||
|
) VALUES (?, ?, ?, 'running', ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
task_id,
|
||||||
|
trow["assignee"] if trow else None,
|
||||||
|
trow["current_step_key"] if trow else None,
|
||||||
|
lock,
|
||||||
|
expires,
|
||||||
|
trow["max_runtime_seconds"] if trow else None,
|
||||||
|
now,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
run_id = run_cur.lastrowid
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET current_run_id = ? WHERE id = ?",
|
||||||
|
(run_id, task_id),
|
||||||
|
)
|
||||||
|
_append_event(
|
||||||
|
conn, task_id, "claimed",
|
||||||
|
{"lock": lock, "expires": expires, "run_id": run_id,
|
||||||
|
"source_status": "review"},
|
||||||
|
run_id=run_id,
|
||||||
|
)
|
||||||
|
return get_task(conn, task_id)
|
||||||
|
|
||||||
|
|
||||||
def heartbeat_claim(
|
def heartbeat_claim(
|
||||||
conn: sqlite3.Connection,
|
conn: sqlite3.Connection,
|
||||||
task_id: str,
|
task_id: str,
|
||||||
|
|
@ -4165,6 +4240,31 @@ def has_spawnable_ready(conn: sqlite3.Connection) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def has_spawnable_review(conn: sqlite3.Connection) -> bool:
|
||||||
|
"""Return True iff there is at least one review+assigned+unclaimed task
|
||||||
|
whose assignee maps to a real Hermes profile.
|
||||||
|
|
||||||
|
Mirror of :func:`has_spawnable_ready` for the review column —
|
||||||
|
used by the health telemetry to decide whether the dispatcher
|
||||||
|
should have spawned a review agent.
|
||||||
|
"""
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT DISTINCT assignee FROM tasks "
|
||||||
|
"WHERE status = 'review' AND assignee IS NOT NULL "
|
||||||
|
" AND claim_lock IS NULL"
|
||||||
|
).fetchall()
|
||||||
|
if not rows:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
from hermes_cli.profiles import profile_exists # local import: avoids cycle
|
||||||
|
except Exception:
|
||||||
|
return True
|
||||||
|
for row in rows:
|
||||||
|
if profile_exists(row["assignee"]):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def dispatch_once(
|
def dispatch_once(
|
||||||
conn: sqlite3.Connection,
|
conn: sqlite3.Connection,
|
||||||
*,
|
*,
|
||||||
|
|
@ -4364,6 +4464,80 @@ def dispatch_once(
|
||||||
)
|
)
|
||||||
if auto:
|
if auto:
|
||||||
result.auto_blocked.append(claimed.id)
|
result.auto_blocked.append(claimed.id)
|
||||||
|
|
||||||
|
# ---- review column dispatch ----
|
||||||
|
# Review tasks are tasks that a worker moved to 'review' after
|
||||||
|
# creating a PR. The dispatcher spawns a review agent (loading
|
||||||
|
# sdlc-review skill) that verifies the PR and either merges (→ done)
|
||||||
|
# or rejects (→ back to running for the worker to fix).
|
||||||
|
#
|
||||||
|
# Same concurrency model as ready dispatch: review spawns count
|
||||||
|
# against max_spawn alongside ready tasks, so the total number of
|
||||||
|
# running workers stays bounded.
|
||||||
|
review_rows = conn.execute(
|
||||||
|
"SELECT id, assignee FROM tasks "
|
||||||
|
"WHERE status = 'review' AND claim_lock IS NULL "
|
||||||
|
"ORDER BY priority DESC, created_at ASC"
|
||||||
|
).fetchall()
|
||||||
|
for row in review_rows:
|
||||||
|
if max_spawn is not None and running_count + spawned >= max_spawn:
|
||||||
|
break
|
||||||
|
if not row["assignee"]:
|
||||||
|
result.skipped_unassigned.append(row["id"])
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
from hermes_cli.profiles import profile_exists
|
||||||
|
except Exception:
|
||||||
|
profile_exists = None # type: ignore[assignment]
|
||||||
|
if profile_exists is not None and not profile_exists(row["assignee"]):
|
||||||
|
result.skipped_nonspawnable.append(row["id"])
|
||||||
|
continue
|
||||||
|
if dry_run:
|
||||||
|
result.spawned.append((row["id"], row["assignee"], ""))
|
||||||
|
continue
|
||||||
|
claimed = claim_review_task(conn, row["id"], ttl_seconds=ttl_seconds)
|
||||||
|
if claimed is None:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
workspace = resolve_workspace(claimed, board=board)
|
||||||
|
except Exception as exc:
|
||||||
|
auto = _record_spawn_failure(
|
||||||
|
conn, claimed.id, f"workspace: {exc}",
|
||||||
|
failure_limit=failure_limit,
|
||||||
|
)
|
||||||
|
if auto:
|
||||||
|
result.auto_blocked.append(claimed.id)
|
||||||
|
continue
|
||||||
|
# Persist the resolved workspace path so the worker can cd there.
|
||||||
|
set_workspace_path(conn, claimed.id, str(workspace))
|
||||||
|
# Force-load sdlc-review skill for review agents. The
|
||||||
|
# _default_spawn function already auto-loads kanban-worker, and
|
||||||
|
# appends task.skills via --skills. Setting task.skills here
|
||||||
|
# means the review agent gets both kanban-worker (lifecycle)
|
||||||
|
# and sdlc-review (review logic: AC verification, merge, etc.).
|
||||||
|
claimed.skills = ["sdlc-review"]
|
||||||
|
_spawn = spawn_fn if spawn_fn is not None else _default_spawn
|
||||||
|
try:
|
||||||
|
import inspect
|
||||||
|
try:
|
||||||
|
sig = inspect.signature(_spawn)
|
||||||
|
if "board" in sig.parameters:
|
||||||
|
pid = _spawn(claimed, str(workspace), board=board)
|
||||||
|
else:
|
||||||
|
pid = _spawn(claimed, str(workspace))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
pid = _spawn(claimed, str(workspace))
|
||||||
|
if pid:
|
||||||
|
_set_worker_pid(conn, claimed.id, int(pid))
|
||||||
|
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
|
||||||
|
spawned += 1
|
||||||
|
except Exception as exc:
|
||||||
|
auto = _record_spawn_failure(
|
||||||
|
conn, claimed.id, str(exc),
|
||||||
|
failure_limit=failure_limit,
|
||||||
|
)
|
||||||
|
if auto:
|
||||||
|
result.auto_blocked.append(claimed.id)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import os
|
import os
|
||||||
|
import sqlite3
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
@ -2055,3 +2056,179 @@ def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_s
|
||||||
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
|
||||||
|
|
||||||
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
|
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
|
||||||
|
|
||||||
|
# Review column dispatch
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _set_task_status(conn: sqlite3.Connection, task_id: str, status: str) -> None:
|
||||||
|
"""Test helper: set a task's status directly."""
|
||||||
|
conn.execute("UPDATE tasks SET status = ? WHERE id = ?", (status, task_id))
|
||||||
|
|
||||||
|
|
||||||
|
def test_claim_review_task_transitions_to_running(kanban_home):
|
||||||
|
"""claim_review_task atomically transitions review -> running."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
claimed = kb.claim_review_task(conn, t)
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed.status == "running"
|
||||||
|
assert claimed.claim_lock is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_claim_review_task_fails_on_non_review(kanban_home):
|
||||||
|
"""claim_review_task returns None if task is not in review status."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="ready task", assignee="alice")
|
||||||
|
# Task is in 'ready', not 'review'
|
||||||
|
claimed = kb.claim_review_task(conn, t)
|
||||||
|
assert claimed is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_claim_review_task_fails_when_already_claimed(kanban_home):
|
||||||
|
"""claim_review_task returns None if the task was already claimed."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
first = kb.claim_review_task(conn, t)
|
||||||
|
assert first is not None
|
||||||
|
second = kb.claim_review_task(conn, t)
|
||||||
|
assert second is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_dry_run(kanban_home, all_assignees_spawnable):
|
||||||
|
"""dispatch_once dry-run sees review tasks and reports them as spawned."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
res = kb.dispatch_once(conn, dry_run=True)
|
||||||
|
assert len(res.spawned) == 1
|
||||||
|
assert res.spawned[0][0] == t
|
||||||
|
# Dry run must NOT mutate status.
|
||||||
|
with kb.connect() as conn:
|
||||||
|
assert kb.get_task(conn, t).status == "review"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_spawns_with_correct_skills(
|
||||||
|
kanban_home, all_assignees_spawnable,
|
||||||
|
):
|
||||||
|
"""Review tasks get sdlc-review skill set before spawning."""
|
||||||
|
spawned_tasks = []
|
||||||
|
|
||||||
|
def capture_spawn(task, workspace, board=None):
|
||||||
|
spawned_tasks.append(task)
|
||||||
|
return 42 # fake PID
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
res = kb.dispatch_once(conn, spawn_fn=capture_spawn)
|
||||||
|
assert len(res.spawned) == 1
|
||||||
|
assert len(spawned_tasks) == 1
|
||||||
|
assert spawned_tasks[0].skills == ["sdlc-review"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_skips_unassigned(kanban_home):
|
||||||
|
"""Unassigned review tasks go to skipped_unassigned, not spawned."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review floater")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
res = kb.dispatch_once(conn, dry_run=True)
|
||||||
|
assert t in res.skipped_unassigned
|
||||||
|
assert not res.spawned
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_counts_toward_max_spawn(
|
||||||
|
kanban_home, all_assignees_spawnable,
|
||||||
|
):
|
||||||
|
"""Review spawns count against max_spawn alongside ready tasks."""
|
||||||
|
spawns = []
|
||||||
|
|
||||||
|
def fake_spawn(task, workspace, board=None):
|
||||||
|
spawns.append(task.id)
|
||||||
|
return 42
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
# Create 2 ready tasks + 1 review task, max_spawn=2
|
||||||
|
t1 = kb.create_task(conn, title="ready 1", assignee="alice")
|
||||||
|
t2 = kb.create_task(conn, title="ready 2", assignee="bob")
|
||||||
|
t3 = kb.create_task(conn, title="review", assignee="alice")
|
||||||
|
_set_task_status(conn, t3, "review")
|
||||||
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
|
||||||
|
# Only 2 should spawn (ready tasks get priority in the loop)
|
||||||
|
assert len(res.spawned) == 2
|
||||||
|
assert len(spawns) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_spawns_when_ready_empty(
|
||||||
|
kanban_home, all_assignees_spawnable,
|
||||||
|
):
|
||||||
|
"""When only review tasks exist, they still get dispatched."""
|
||||||
|
spawns = []
|
||||||
|
|
||||||
|
def fake_spawn(task, workspace, board=None):
|
||||||
|
spawns.append(task.id)
|
||||||
|
return 42
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
||||||
|
assert len(res.spawned) == 1
|
||||||
|
assert spawns[0] == t
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_spawnable_review_true(kanban_home):
|
||||||
|
"""has_spawnable_review returns True when review tasks exist with real profiles."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review me", assignee="default")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
# default profile should exist in the test env
|
||||||
|
assert kb.has_spawnable_review(conn) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_spawnable_review_false_on_empty(kanban_home):
|
||||||
|
"""has_spawnable_review returns False when no review tasks exist."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
assert kb.has_spawnable_review(conn) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_spawnable_review_false_when_only_terminal_lanes(
|
||||||
|
kanban_home, monkeypatch,
|
||||||
|
):
|
||||||
|
"""has_spawnable_review returns False when review tasks are terminal lanes."""
|
||||||
|
from hermes_cli import profiles
|
||||||
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
assert kb.has_spawnable_review(conn) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_skips_nonspawnable(kanban_home, monkeypatch):
|
||||||
|
"""Review tasks with non-existent profiles go to skipped_nonspawnable."""
|
||||||
|
from hermes_cli import profiles
|
||||||
|
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
||||||
|
_set_task_status(conn, t, "review")
|
||||||
|
res = kb.dispatch_once(conn, dry_run=True)
|
||||||
|
assert t in res.skipped_nonspawnable
|
||||||
|
assert not res.spawned
|
||||||
|
|
||||||
|
|
||||||
|
def test_review_status_in_valid_statuses():
|
||||||
|
"""'review' is a valid task status."""
|
||||||
|
assert "review" in kb.VALID_STATUSES
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_review_does_not_claim_ready_tasks(
|
||||||
|
kanban_home, all_assignees_spawnable,
|
||||||
|
):
|
||||||
|
"""Review dispatch uses claim_review_task, which only claims review tasks."""
|
||||||
|
with kb.connect() as conn:
|
||||||
|
t = kb.create_task(conn, title="ready task", assignee="alice")
|
||||||
|
# claim_review_task should NOT claim a ready task
|
||||||
|
claimed = kb.claim_review_task(conn, t)
|
||||||
|
assert claimed is None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue