feat(cli): add kanban swarm topology helper

Salvages #26791 by @Niraven. Adds 'hermes kanban swarm' to create a
durable Kanban Swarm v1 graph: a completed root/blackboard card,
parallel worker cards, a verifier gated on all workers, and a
synthesizer gated on the verifier. Stores shared swarm blackboard
updates as structured JSON comments on the root card.

Self-contained: new hermes_cli/kanban_swarm.py module + CLI wiring +
unit tests.
This commit is contained in:
Niraven 2026-05-18 21:10:05 -07:00 committed by Teknium
parent 79f6654d16
commit 3ee7a5546d
3 changed files with 451 additions and 0 deletions

View file

@ -24,6 +24,7 @@ from pathlib import Path
from typing import Any, Optional
from hermes_cli import kanban_db as kb
from hermes_cli import kanban_swarm as ks
from hermes_cli.profiles import get_active_profile_name, get_profile_dir, seed_profile_skills
@ -313,6 +314,27 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
"to skip the brief running-to-blocked transition.")
p_create.add_argument("--json", action="store_true", help="Emit JSON output")
# --- swarm ---
p_swarm = sub.add_parser(
"swarm",
help="Create a Kanban Swarm v1 graph (parallel workers → verifier → synthesizer)",
)
p_swarm.add_argument("goal", help="Swarm goal / final outcome")
p_swarm.add_argument(
"--worker",
action="append",
default=[],
metavar="PROFILE:TITLE[:SKILL,SKILL]",
help="Parallel worker card (repeatable)",
)
p_swarm.add_argument("--verifier", required=True, help="Verifier profile")
p_swarm.add_argument("--synthesizer", required=True, help="Synthesizer/writer profile")
p_swarm.add_argument("--tenant", default=None, help="Tenant namespace")
p_swarm.add_argument("--priority", type=int, default=0, help="Priority tiebreaker")
p_swarm.add_argument("--created-by", default=None, help="Creator/anchor profile")
p_swarm.add_argument("--idempotency-key", default=None, help="Dedup key for the root card")
p_swarm.add_argument("--json", action="store_true", help="Emit JSON output")
# --- list ---
p_list = sub.add_parser("list", aliases=["ls"], help="List tasks")
p_list.add_argument("--mine", action="store_true",
@ -778,6 +800,7 @@ def kanban_command(args: argparse.Namespace) -> int:
handlers = {
"init": _cmd_init,
"create": _cmd_create,
"swarm": _cmd_swarm,
"list": _cmd_list,
"ls": _cmd_list,
"show": _cmd_show,
@ -1212,6 +1235,37 @@ def _cmd_create(args: argparse.Namespace) -> int:
return 0
def _cmd_swarm(args: argparse.Namespace) -> int:
try:
workers = [ks.parse_worker_arg(raw) for raw in (args.worker or [])]
except ValueError as exc:
print(f"kanban swarm: {exc}", file=sys.stderr)
return 2
if not workers:
print("kanban swarm: at least one --worker is required", file=sys.stderr)
return 2
with kb.connect() as conn:
created = ks.create_swarm(
conn,
goal=args.goal,
workers=workers,
verifier_assignee=args.verifier,
synthesizer_assignee=args.synthesizer,
tenant=args.tenant,
created_by=args.created_by or _profile_author(),
priority=args.priority,
idempotency_key=getattr(args, "idempotency_key", None),
)
if getattr(args, "json", False):
print(json.dumps(created.as_dict(), indent=2, ensure_ascii=False))
else:
print(f"Swarm root: {created.root_id}")
print("Workers: " + ", ".join(created.worker_ids))
print(f"Verifier: {created.verifier_id}")
print(f"Synthesizer: {created.synthesizer_id}")
return 0
def _cmd_list(args: argparse.Namespace) -> int:
assignee = args.assignee
if args.mine and not assignee:

279
hermes_cli/kanban_swarm.py Normal file
View file

@ -0,0 +1,279 @@
"""Kanban Swarm v1: thin swarm topology helpers on top of Kanban.
This module intentionally does not introduce a second scheduler. It writes a
small task graph into the existing Kanban kernel:
planning root (completed immediately)
parallel specialist workers (ready)
verifier (todo until all workers done)
synthesizer (todo until verifier done)
The shared blackboard is also deliberately low-tech: structured JSON comments on
the root task. That keeps all state in existing task_comments/task_events rows,
so the dashboard, notifier, slash command, and dispatcher keep working without a
new service.
"""
from __future__ import annotations
from dataclasses import dataclass, field
import json
import sqlite3
from typing import Any, Iterable, Optional
from hermes_cli import kanban_db as kb
BLACKBOARD_PREFIX = "[swarm:blackboard] "
@dataclass(frozen=True)
class SwarmWorkerSpec:
"""A single parallel worker card in a swarm."""
profile: str
title: str
body: str
skills: list[str] = field(default_factory=list)
priority: int = 0
max_runtime_seconds: Optional[int] = None
@dataclass(frozen=True)
class SwarmCreated:
"""IDs produced by :func:`create_swarm`."""
root_id: str
worker_ids: list[str]
verifier_id: str
synthesizer_id: str
def as_dict(self) -> dict[str, Any]:
return {
"root_id": self.root_id,
"worker_ids": list(self.worker_ids),
"verifier_id": self.verifier_id,
"synthesizer_id": self.synthesizer_id,
}
def _require_text(value: str, field_name: str) -> str:
text = (value or "").strip()
if not text:
raise ValueError(f"{field_name} is required")
return text
def _swarm_context(root_id: str, goal: str) -> str:
return (
"\n\n## Swarm protocol\n"
f"- Swarm root / shared blackboard: `{root_id}`.\n"
"- Read sibling/parent handoffs from Kanban context before working.\n"
"- Put machine-readable facts in completion metadata.\n"
"- Put cross-worker notes on the root task using structured comments.\n"
f"- Goal: {goal.strip()}\n"
)
def create_swarm(
conn: sqlite3.Connection,
*,
goal: str,
workers: Iterable[SwarmWorkerSpec],
verifier_assignee: str,
synthesizer_assignee: str,
root_title: Optional[str] = None,
verifier_title: str = "Verify swarm outputs",
synthesizer_title: str = "Synthesize swarm outputs",
tenant: Optional[str] = None,
created_by: str = "swarm-orchestrator",
workspace_kind: str = "scratch",
workspace_path: Optional[str] = None,
priority: int = 0,
idempotency_key: Optional[str] = None,
) -> SwarmCreated:
"""Create a durable Kanban swarm graph.
The returned graph is immediately dispatchable: the planning root is marked
``done`` with topology metadata, parallel workers are ``ready``, the verifier
waits for every worker, and the synthesizer waits for the verifier.
"""
goal = _require_text(goal, "goal")
verifier_assignee = _require_text(verifier_assignee, "verifier_assignee")
synthesizer_assignee = _require_text(synthesizer_assignee, "synthesizer_assignee")
worker_specs = list(workers)
if not worker_specs:
raise ValueError("at least one worker is required")
for i, spec in enumerate(worker_specs, start=1):
_require_text(spec.profile, f"workers[{i}].profile")
_require_text(spec.title, f"workers[{i}].title")
root = kb.create_task(
conn,
title=root_title or f"Swarm: {goal.splitlines()[0][:80]}",
body=(
"Kanban Swarm v1 planning/root card. This card is completed "
"immediately so parallel workers can start while it remains the "
"shared blackboard and audit anchor.\n\n"
f"Goal:\n{goal}"
),
assignee=created_by,
created_by=created_by,
tenant=tenant,
priority=priority,
idempotency_key=idempotency_key,
workspace_kind=workspace_kind,
workspace_path=workspace_path,
skills=["kanban-orchestrator"],
)
# If idempotency returned an existing non-archived root, do not duplicate the
# swarm graph. Recover the topology from the root's latest blackboard, if it
# was created by this helper previously.
existing = latest_blackboard(conn, root).get("topology")
if isinstance(existing, dict):
worker_ids = [str(x) for x in existing.get("worker_ids", []) if x]
verifier_id = existing.get("verifier_id")
synthesizer_id = existing.get("synthesizer_id")
if worker_ids and verifier_id and synthesizer_id:
return SwarmCreated(
root_id=root,
worker_ids=worker_ids,
verifier_id=str(verifier_id),
synthesizer_id=str(synthesizer_id),
)
kb.complete_task(
conn,
root,
summary="Swarm topology planned; root remains the shared blackboard.",
metadata={
"kind": "kanban_swarm_v1",
"goal": goal,
"worker_count": len(worker_specs),
},
)
context_suffix = _swarm_context(root, goal)
worker_ids: list[str] = []
for spec in worker_specs:
worker_id = kb.create_task(
conn,
title=spec.title,
body=(spec.body or "") + context_suffix,
assignee=spec.profile,
created_by=created_by,
parents=[root],
tenant=tenant,
priority=spec.priority or priority,
workspace_kind=workspace_kind,
workspace_path=workspace_path,
skills=spec.skills or None,
max_runtime_seconds=spec.max_runtime_seconds,
)
worker_ids.append(worker_id)
verifier_body = (
"Review every worker handoff and blackboard update. Gate the swarm: "
"complete only with metadata {\"gate\": \"pass\"} when evidence is "
"sufficient; otherwise block with exact missing work."
+ context_suffix
)
verifier = kb.create_task(
conn,
title=verifier_title,
body=verifier_body,
assignee=verifier_assignee,
created_by=created_by,
parents=worker_ids,
tenant=tenant,
priority=priority,
workspace_kind=workspace_kind,
workspace_path=workspace_path,
skills=["requesting-code-review"],
)
synthesizer_body = (
"Synthesize the verified worker outputs into the final deliverable. "
"Do not start until the verifier has passed the gate."
+ context_suffix
)
synthesizer = kb.create_task(
conn,
title=synthesizer_title,
body=synthesizer_body,
assignee=synthesizer_assignee,
created_by=created_by,
parents=[verifier],
tenant=tenant,
priority=priority,
workspace_kind=workspace_kind,
workspace_path=workspace_path,
skills=["avoid-ai-writing"],
)
created = SwarmCreated(root, worker_ids, verifier, synthesizer)
post_blackboard_update(
conn,
root,
author=created_by,
key="topology",
value=created.as_dict() | {"goal": goal},
)
return created
def post_blackboard_update(
conn: sqlite3.Connection,
root_id: str,
*,
author: str,
key: str,
value: Any,
) -> int:
"""Append one structured update to the swarm root blackboard."""
_require_text(root_id, "root_id")
author = _require_text(author, "author")
key = _require_text(key, "key")
payload = json.dumps({"key": key, "value": value}, ensure_ascii=False, sort_keys=True)
return kb.add_comment(conn, root_id, author=author, body=BLACKBOARD_PREFIX + payload)
def latest_blackboard(conn: sqlite3.Connection, root_id: str) -> dict[str, Any]:
"""Merge structured blackboard comments on a root card.
Later comments replace earlier values for the same key. ``_authors`` records
the author of the winning value for traceability.
"""
merged: dict[str, Any] = {}
authors: dict[str, str] = {}
for comment in kb.list_comments(conn, root_id):
body = comment.body or ""
if not body.startswith(BLACKBOARD_PREFIX):
continue
try:
payload = json.loads(body[len(BLACKBOARD_PREFIX):])
except json.JSONDecodeError:
continue
key = payload.get("key")
if not isinstance(key, str) or not key:
continue
merged[key] = payload.get("value")
authors[key] = comment.author
if authors:
merged["_authors"] = authors
return merged
def parse_worker_arg(raw: str) -> SwarmWorkerSpec:
"""Parse CLI ``--worker profile:title[:skill,skill]`` values."""
parts = [p.strip() for p in raw.split(":", 2)]
if len(parts) < 2:
raise ValueError("worker must be profile:title or profile:title:skill,skill")
skills: list[str] = []
if len(parts) == 3 and parts[2]:
skills = [s.strip() for s in parts[2].split(",") if s.strip()]
return SwarmWorkerSpec(profile=parts[0], title=parts[1], body=parts[1], skills=skills)