From 3ee7a5546dfcfe2120223a6ea09a7e6f7d49e10a Mon Sep 17 00:00:00 2001 From: Niraven <87259662+Niraven@users.noreply.github.com> Date: Mon, 18 May 2026 21:10:05 -0700 Subject: [PATCH] feat(cli): add kanban swarm topology helper Salvages #26791 by @Niraven. Adds 'hermes kanban swarm' to create a durable Kanban Swarm v1 graph: a completed root/blackboard card, parallel worker cards, a verifier gated on all workers, and a synthesizer gated on the verifier. Stores shared swarm blackboard updates as structured JSON comments on the root card. Self-contained: new hermes_cli/kanban_swarm.py module + CLI wiring + unit tests. --- hermes_cli/kanban.py | 54 +++++ hermes_cli/kanban_swarm.py | 279 ++++++++++++++++++++++++++ tests/hermes_cli/test_kanban_swarm.py | 118 +++++++++++ 3 files changed, 451 insertions(+) create mode 100644 hermes_cli/kanban_swarm.py create mode 100644 tests/hermes_cli/test_kanban_swarm.py diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index b9f0ce6f57c..2c2e8d3e5e7 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -24,6 +24,7 @@ from pathlib import Path from typing import Any, Optional from hermes_cli import kanban_db as kb +from hermes_cli import kanban_swarm as ks from hermes_cli.profiles import get_active_profile_name, get_profile_dir, seed_profile_skills @@ -313,6 +314,27 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu "to skip the brief running-to-blocked transition.") p_create.add_argument("--json", action="store_true", help="Emit JSON output") + # --- swarm --- + p_swarm = sub.add_parser( + "swarm", + help="Create a Kanban Swarm v1 graph (parallel workers → verifier → synthesizer)", + ) + p_swarm.add_argument("goal", help="Swarm goal / final outcome") + p_swarm.add_argument( + "--worker", + action="append", + default=[], + metavar="PROFILE:TITLE[:SKILL,SKILL]", + help="Parallel worker card (repeatable)", + ) + p_swarm.add_argument("--verifier", required=True, help="Verifier profile") + p_swarm.add_argument("--synthesizer", required=True, help="Synthesizer/writer profile") + p_swarm.add_argument("--tenant", default=None, help="Tenant namespace") + p_swarm.add_argument("--priority", type=int, default=0, help="Priority tiebreaker") + p_swarm.add_argument("--created-by", default=None, help="Creator/anchor profile") + p_swarm.add_argument("--idempotency-key", default=None, help="Dedup key for the root card") + p_swarm.add_argument("--json", action="store_true", help="Emit JSON output") + # --- list --- p_list = sub.add_parser("list", aliases=["ls"], help="List tasks") p_list.add_argument("--mine", action="store_true", @@ -778,6 +800,7 @@ def kanban_command(args: argparse.Namespace) -> int: handlers = { "init": _cmd_init, "create": _cmd_create, + "swarm": _cmd_swarm, "list": _cmd_list, "ls": _cmd_list, "show": _cmd_show, @@ -1212,6 +1235,37 @@ def _cmd_create(args: argparse.Namespace) -> int: return 0 +def _cmd_swarm(args: argparse.Namespace) -> int: + try: + workers = [ks.parse_worker_arg(raw) for raw in (args.worker or [])] + except ValueError as exc: + print(f"kanban swarm: {exc}", file=sys.stderr) + return 2 + if not workers: + print("kanban swarm: at least one --worker is required", file=sys.stderr) + return 2 + with kb.connect() as conn: + created = ks.create_swarm( + conn, + goal=args.goal, + workers=workers, + verifier_assignee=args.verifier, + synthesizer_assignee=args.synthesizer, + tenant=args.tenant, + created_by=args.created_by or _profile_author(), + priority=args.priority, + idempotency_key=getattr(args, "idempotency_key", None), + ) + if getattr(args, "json", False): + print(json.dumps(created.as_dict(), indent=2, ensure_ascii=False)) + else: + print(f"Swarm root: {created.root_id}") + print("Workers: " + ", ".join(created.worker_ids)) + print(f"Verifier: {created.verifier_id}") + print(f"Synthesizer: {created.synthesizer_id}") + return 0 + + def _cmd_list(args: argparse.Namespace) -> int: assignee = args.assignee if args.mine and not assignee: diff --git a/hermes_cli/kanban_swarm.py b/hermes_cli/kanban_swarm.py new file mode 100644 index 00000000000..2b0fa0b9e98 --- /dev/null +++ b/hermes_cli/kanban_swarm.py @@ -0,0 +1,279 @@ +"""Kanban Swarm v1: thin swarm topology helpers on top of Kanban. + +This module intentionally does not introduce a second scheduler. It writes a +small task graph into the existing Kanban kernel: + + planning root (completed immediately) + ├─ parallel specialist workers (ready) + └─ verifier (todo until all workers done) + └─ synthesizer (todo until verifier done) + +The shared blackboard is also deliberately low-tech: structured JSON comments on +the root task. That keeps all state in existing task_comments/task_events rows, +so the dashboard, notifier, slash command, and dispatcher keep working without a +new service. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import json +import sqlite3 +from typing import Any, Iterable, Optional + +from hermes_cli import kanban_db as kb + +BLACKBOARD_PREFIX = "[swarm:blackboard] " + + +@dataclass(frozen=True) +class SwarmWorkerSpec: + """A single parallel worker card in a swarm.""" + + profile: str + title: str + body: str + skills: list[str] = field(default_factory=list) + priority: int = 0 + max_runtime_seconds: Optional[int] = None + + +@dataclass(frozen=True) +class SwarmCreated: + """IDs produced by :func:`create_swarm`.""" + + root_id: str + worker_ids: list[str] + verifier_id: str + synthesizer_id: str + + def as_dict(self) -> dict[str, Any]: + return { + "root_id": self.root_id, + "worker_ids": list(self.worker_ids), + "verifier_id": self.verifier_id, + "synthesizer_id": self.synthesizer_id, + } + + +def _require_text(value: str, field_name: str) -> str: + text = (value or "").strip() + if not text: + raise ValueError(f"{field_name} is required") + return text + + +def _swarm_context(root_id: str, goal: str) -> str: + return ( + "\n\n## Swarm protocol\n" + f"- Swarm root / shared blackboard: `{root_id}`.\n" + "- Read sibling/parent handoffs from Kanban context before working.\n" + "- Put machine-readable facts in completion metadata.\n" + "- Put cross-worker notes on the root task using structured comments.\n" + f"- Goal: {goal.strip()}\n" + ) + + +def create_swarm( + conn: sqlite3.Connection, + *, + goal: str, + workers: Iterable[SwarmWorkerSpec], + verifier_assignee: str, + synthesizer_assignee: str, + root_title: Optional[str] = None, + verifier_title: str = "Verify swarm outputs", + synthesizer_title: str = "Synthesize swarm outputs", + tenant: Optional[str] = None, + created_by: str = "swarm-orchestrator", + workspace_kind: str = "scratch", + workspace_path: Optional[str] = None, + priority: int = 0, + idempotency_key: Optional[str] = None, +) -> SwarmCreated: + """Create a durable Kanban swarm graph. + + The returned graph is immediately dispatchable: the planning root is marked + ``done`` with topology metadata, parallel workers are ``ready``, the verifier + waits for every worker, and the synthesizer waits for the verifier. + """ + + goal = _require_text(goal, "goal") + verifier_assignee = _require_text(verifier_assignee, "verifier_assignee") + synthesizer_assignee = _require_text(synthesizer_assignee, "synthesizer_assignee") + worker_specs = list(workers) + if not worker_specs: + raise ValueError("at least one worker is required") + for i, spec in enumerate(worker_specs, start=1): + _require_text(spec.profile, f"workers[{i}].profile") + _require_text(spec.title, f"workers[{i}].title") + + root = kb.create_task( + conn, + title=root_title or f"Swarm: {goal.splitlines()[0][:80]}", + body=( + "Kanban Swarm v1 planning/root card. This card is completed " + "immediately so parallel workers can start while it remains the " + "shared blackboard and audit anchor.\n\n" + f"Goal:\n{goal}" + ), + assignee=created_by, + created_by=created_by, + tenant=tenant, + priority=priority, + idempotency_key=idempotency_key, + workspace_kind=workspace_kind, + workspace_path=workspace_path, + skills=["kanban-orchestrator"], + ) + + # If idempotency returned an existing non-archived root, do not duplicate the + # swarm graph. Recover the topology from the root's latest blackboard, if it + # was created by this helper previously. + existing = latest_blackboard(conn, root).get("topology") + if isinstance(existing, dict): + worker_ids = [str(x) for x in existing.get("worker_ids", []) if x] + verifier_id = existing.get("verifier_id") + synthesizer_id = existing.get("synthesizer_id") + if worker_ids and verifier_id and synthesizer_id: + return SwarmCreated( + root_id=root, + worker_ids=worker_ids, + verifier_id=str(verifier_id), + synthesizer_id=str(synthesizer_id), + ) + + kb.complete_task( + conn, + root, + summary="Swarm topology planned; root remains the shared blackboard.", + metadata={ + "kind": "kanban_swarm_v1", + "goal": goal, + "worker_count": len(worker_specs), + }, + ) + + context_suffix = _swarm_context(root, goal) + worker_ids: list[str] = [] + for spec in worker_specs: + worker_id = kb.create_task( + conn, + title=spec.title, + body=(spec.body or "") + context_suffix, + assignee=spec.profile, + created_by=created_by, + parents=[root], + tenant=tenant, + priority=spec.priority or priority, + workspace_kind=workspace_kind, + workspace_path=workspace_path, + skills=spec.skills or None, + max_runtime_seconds=spec.max_runtime_seconds, + ) + worker_ids.append(worker_id) + + verifier_body = ( + "Review every worker handoff and blackboard update. Gate the swarm: " + "complete only with metadata {\"gate\": \"pass\"} when evidence is " + "sufficient; otherwise block with exact missing work." + + context_suffix + ) + verifier = kb.create_task( + conn, + title=verifier_title, + body=verifier_body, + assignee=verifier_assignee, + created_by=created_by, + parents=worker_ids, + tenant=tenant, + priority=priority, + workspace_kind=workspace_kind, + workspace_path=workspace_path, + skills=["requesting-code-review"], + ) + + synthesizer_body = ( + "Synthesize the verified worker outputs into the final deliverable. " + "Do not start until the verifier has passed the gate." + + context_suffix + ) + synthesizer = kb.create_task( + conn, + title=synthesizer_title, + body=synthesizer_body, + assignee=synthesizer_assignee, + created_by=created_by, + parents=[verifier], + tenant=tenant, + priority=priority, + workspace_kind=workspace_kind, + workspace_path=workspace_path, + skills=["avoid-ai-writing"], + ) + + created = SwarmCreated(root, worker_ids, verifier, synthesizer) + post_blackboard_update( + conn, + root, + author=created_by, + key="topology", + value=created.as_dict() | {"goal": goal}, + ) + return created + + +def post_blackboard_update( + conn: sqlite3.Connection, + root_id: str, + *, + author: str, + key: str, + value: Any, +) -> int: + """Append one structured update to the swarm root blackboard.""" + + _require_text(root_id, "root_id") + author = _require_text(author, "author") + key = _require_text(key, "key") + payload = json.dumps({"key": key, "value": value}, ensure_ascii=False, sort_keys=True) + return kb.add_comment(conn, root_id, author=author, body=BLACKBOARD_PREFIX + payload) + + +def latest_blackboard(conn: sqlite3.Connection, root_id: str) -> dict[str, Any]: + """Merge structured blackboard comments on a root card. + + Later comments replace earlier values for the same key. ``_authors`` records + the author of the winning value for traceability. + """ + + merged: dict[str, Any] = {} + authors: dict[str, str] = {} + for comment in kb.list_comments(conn, root_id): + body = comment.body or "" + if not body.startswith(BLACKBOARD_PREFIX): + continue + try: + payload = json.loads(body[len(BLACKBOARD_PREFIX):]) + except json.JSONDecodeError: + continue + key = payload.get("key") + if not isinstance(key, str) or not key: + continue + merged[key] = payload.get("value") + authors[key] = comment.author + if authors: + merged["_authors"] = authors + return merged + + +def parse_worker_arg(raw: str) -> SwarmWorkerSpec: + """Parse CLI ``--worker profile:title[:skill,skill]`` values.""" + + parts = [p.strip() for p in raw.split(":", 2)] + if len(parts) < 2: + raise ValueError("worker must be profile:title or profile:title:skill,skill") + skills: list[str] = [] + if len(parts) == 3 and parts[2]: + skills = [s.strip() for s in parts[2].split(",") if s.strip()] + return SwarmWorkerSpec(profile=parts[0], title=parts[1], body=parts[1], skills=skills) diff --git a/tests/hermes_cli/test_kanban_swarm.py b/tests/hermes_cli/test_kanban_swarm.py new file mode 100644 index 00000000000..358e41d4611 --- /dev/null +++ b/tests/hermes_cli/test_kanban_swarm.py @@ -0,0 +1,118 @@ +import json + +from hermes_cli import kanban_db as kb +from hermes_cli.kanban_swarm import ( + SwarmWorkerSpec, + create_swarm, + latest_blackboard, + post_blackboard_update, +) + + +def test_create_swarm_builds_parallel_workers_verifier_and_synthesizer(tmp_path): + conn = kb.connect(tmp_path / "kanban.db") + try: + created = create_swarm( + conn, + goal="Map the target market and produce a decision memo.", + workers=[ + SwarmWorkerSpec(profile="researcher-a", title="Market scan", body="Find competitors"), + SwarmWorkerSpec(profile="researcher-b", title="Customer scan", body="Find customer pains"), + ], + verifier_assignee="reviewer", + synthesizer_assignee="writer", + tenant="intel", + created_by="orchestrator", + ) + + root = kb.get_task(conn, created.root_id) + workers = [kb.get_task(conn, tid) for tid in created.worker_ids] + verifier = kb.get_task(conn, created.verifier_id) + synthesizer = kb.get_task(conn, created.synthesizer_id) + + assert root.status == "done" + assert root.assignee == "orchestrator" + assert [task.status for task in workers] == ["ready", "ready"] + assert [task.assignee for task in workers] == ["researcher-a", "researcher-b"] + assert verifier.status == "todo" + assert synthesizer.status == "todo" + assert set(kb.parent_ids(conn, created.verifier_id)) == set(created.worker_ids) + assert kb.parent_ids(conn, created.synthesizer_id) == [created.verifier_id] + assert all(created.root_id in (task.body or "") for task in workers) + finally: + conn.close() + + +def test_swarm_blackboard_merges_structured_updates(tmp_path): + conn = kb.connect(tmp_path / "kanban.db") + try: + created = create_swarm( + conn, + goal="Collect evidence.", + workers=[SwarmWorkerSpec(profile="researcher", title="Evidence", body="Find proof")], + verifier_assignee="reviewer", + synthesizer_assignee="writer", + ) + + post_blackboard_update( + conn, + created.root_id, + author="researcher", + key="sources", + value=["https://example.com/a"], + ) + post_blackboard_update( + conn, + created.root_id, + author="reviewer", + key="risks", + value={"missing_primary_source": True}, + ) + + board = latest_blackboard(conn, created.root_id) + assert board["sources"] == ["https://example.com/a"] + assert board["risks"] == {"missing_primary_source": True} + assert board["_authors"]["sources"] == "researcher" + finally: + conn.close() + + +def test_swarm_verifier_and_synthesis_are_dependency_gated(tmp_path): + conn = kb.connect(tmp_path / "kanban.db") + try: + created = create_swarm( + conn, + goal="Research two branches then verify and synthesize.", + workers=[ + SwarmWorkerSpec(profile="a", title="Branch A", body="A"), + SwarmWorkerSpec(profile="b", title="Branch B", body="B"), + ], + verifier_assignee="reviewer", + synthesizer_assignee="writer", + ) + + kb.complete_task( + conn, + created.worker_ids[0], + summary="A done", + metadata={"confidence": 0.8}, + ) + kb.recompute_ready(conn) + assert kb.get_task(conn, created.verifier_id).status == "todo" + assert kb.get_task(conn, created.synthesizer_id).status == "todo" + + kb.complete_task(conn, created.worker_ids[1], summary="B done") + kb.recompute_ready(conn) + assert kb.get_task(conn, created.verifier_id).status == "ready" + assert kb.get_task(conn, created.synthesizer_id).status == "todo" + + kb.complete_task( + conn, + created.verifier_id, + summary="Verified both branches", + metadata={"gate": "pass"}, + ) + kb.recompute_ready(conn) + assert kb.get_task(conn, created.synthesizer_id).status == "ready" + finally: + conn.close()