hermes-agent/tests/plugins/test_kanban_dashboard_plugin.py
Teknium c868425467
feat(kanban): durable multi-profile collaboration board (#17805)
Salvage of PR #16100 onto current main (after emozilla's #17514 fix
that unblocks plugin Pydantic body validation). History preserved on
the standing `feat/kanban-standing` branch; this squashes the 22
iterative commits into one clean landing.

What this lands:
- SQLite kernel (hermes_cli/kanban_db.py) — durable task board with
  tasks, task_links, task_runs, task_comments, task_events,
  kanban_notify_subs tables. WAL mode, atomic claim via CAS,
  tenant-namespaced, skills JSON array per task, max-runtime timeouts,
  worker heartbeats, idempotency keys, circuit breaker on repeated
  spawn failures, crash detection via /proc/<pid>/status, run history
  preserved across attempts.
- Dispatcher — runs inside the gateway by default
  (`kanban.dispatch_in_gateway: true`). Ticks every 60s, reclaims
  stale claims, promotes ready tasks, spawns `hermes -p <assignee>
  chat -q "work kanban task <id>"` with HERMES_KANBAN_TASK +
  HERMES_KANBAN_WORKSPACE env. Auto-loads `--skills kanban-worker`
  plus any per-task skills. Health telemetry warns on stuck ready
  queue.
- Structured tool surface (tools/kanban_tools.py) — 7 tools
  (kanban_show, kanban_complete, kanban_block, kanban_heartbeat,
  kanban_comment, kanban_create, kanban_link). Gated on
  HERMES_KANBAN_TASK via check_fn so zero schema footprint in normal
  sessions.
- System-prompt guidance (agent/prompt_builder.py KANBAN_GUIDANCE)
  injected only when kanban tools are active.
- Dashboard plugin (plugins/kanban/dashboard/) — Linear-style board
  UI: triage/todo/ready/running/blocked/done columns, drag-drop,
  inline create, task drawer with markdown, comments, run history,
  dependency editor, bulk ops, lanes-by-profile grouping, WS-driven
  live refresh. Matches active dashboard theme via CSS variables.
- CLI — `hermes kanban init|create|list|show|assign|link|unlink|
  claim|comment|complete|block|unblock|archive|tail|dispatch|context|
  init|gc|watch|stats|notify|log|heartbeat|runs|assignees` +
  `/kanban` slash in-session.
- Worker + orchestrator skills (skills/devops/kanban-worker +
  kanban-orchestrator) — pattern library for good summary/metadata
  shapes, retry diagnostics, block-reason examples, fan-out patterns.
- Per-task force-loaded skills — `--skill <name>` (repeatable),
  stored as JSON, threaded through to dispatcher argv as one
  `--skills X` pair per skill alongside the built-in kanban-worker.
  Dashboard + CLI + tool parity.
- Deprecation of standalone `hermes kanban daemon` — stub exits 2
  with migration guidance; `--force` escape hatch for headless hosts.
- Docs (website/docs/user-guide/features/kanban.md + kanban-tutorial.md)
  with 11 dashboard screenshots walking through four user stories
  (Solo Dev, Fleet Farming, Role Pipeline, Circuit Breaker).
- Tests (251 passing): kernel schema + migration + CAS atomicity,
  dispatcher logic, circuit breaker, crash detection, max-runtime
  timeouts, claim lifecycle, tenant isolation, idempotency keys, per-
  task skills round-trip + validation + dispatcher argv, tool surface
  (7 tools × round-trip + error paths), dashboard REST (CRUD + bulk
  + links + warnings), gateway-embedded dispatcher (config gate, env
  override, graceful shutdown), CLI deprecation stub, migration from
  legacy schemas.

Gateway integration:
- GatewayRunner._kanban_dispatcher_watcher — new asyncio background
  task, symmetric with _kanban_notifier_watcher. Runs dispatch_once
  via asyncio.to_thread so SQLite WAL never blocks the loop. Sleeps
  in 1s slices for snappy shutdown. Respects HERMES_KANBAN_DISPATCH_IN_GATEWAY=0
  env override for debugging.
- Config: new `kanban` section in DEFAULT_CONFIG with
  `dispatch_in_gateway: true` (default) + `dispatch_interval_seconds: 60`.
  Additive — no \_config_version bump needed.

Forward-compat:
- workflow_template_id / current_step_key columns on tasks (v1 writes
  NULL; v2 will use them for routing).
- task_runs holds claim machinery (claim_lock, claim_expires,
  worker_pid, last_heartbeat_at) so multi-attempt history is first-
  class from day one.

Closes #16102.

Co-authored-by: emozilla <emozilla@nousresearch.com>
2026-04-30 13:36:47 -07:00

889 lines
31 KiB
Python

"""Tests for the Kanban dashboard plugin backend (plugins/kanban/dashboard/plugin_api.py).
The plugin mounts as /api/plugins/kanban/ inside the dashboard's FastAPI app,
but here we attach its router to a bare FastAPI instance so we can test the
REST surface without spinning up the whole dashboard.
"""
from __future__ import annotations
import importlib.util
import os
import sys
import time
from pathlib import Path
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from hermes_cli import kanban_db as kb
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _load_plugin_router():
"""Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router."""
repo_root = Path(__file__).resolve().parents[2]
plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py"
assert plugin_file.exists(), f"plugin file missing: {plugin_file}"
spec = importlib.util.spec_from_file_location(
"hermes_dashboard_plugin_kanban_test", plugin_file,
)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod.router
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.fixture
def client(kanban_home):
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
return TestClient(app)
# ---------------------------------------------------------------------------
# GET /board on an empty DB
# ---------------------------------------------------------------------------
def test_board_empty(client):
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
# All canonical columns present (triage + the rest), each empty.
names = [c["name"] for c in data["columns"]]
for expected in ("triage", "todo", "ready", "running", "blocked", "done"):
assert expected in names, f"missing column {expected}: {names}"
assert all(len(c["tasks"]) == 0 for c in data["columns"])
assert data["tenants"] == []
assert data["assignees"] == []
assert data["latest_event_id"] == 0
# ---------------------------------------------------------------------------
# POST /tasks then GET /board sees it
# ---------------------------------------------------------------------------
def test_create_task_appears_on_board(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "Research LLM caching",
"assignee": "researcher",
"priority": 3,
"tenant": "acme",
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["title"] == "Research LLM caching"
assert task["assignee"] == "researcher"
assert task["status"] == "ready" # no parents -> immediately ready
assert task["priority"] == 3
assert task["tenant"] == "acme"
task_id = task["id"]
# Board now lists it under 'ready'.
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
ready = next(c for c in data["columns"] if c["name"] == "ready")
assert len(ready["tasks"]) == 1
assert ready["tasks"][0]["id"] == task_id
assert "acme" in data["tenants"]
assert "researcher" in data["assignees"]
def test_tenant_filter(client):
client.post("/api/plugins/kanban/tasks", json={"title": "A", "tenant": "t1"})
client.post("/api/plugins/kanban/tasks", json={"title": "B", "tenant": "t2"})
r = client.get("/api/plugins/kanban/board?tenant=t1")
counts = {c["name"]: len(c["tasks"]) for c in r.json()["columns"]}
total = sum(counts.values())
assert total == 1
r = client.get("/api/plugins/kanban/board?tenant=t2")
total = sum(len(c["tasks"]) for c in r.json()["columns"])
assert total == 1
# ---------------------------------------------------------------------------
# GET /tasks/:id returns body + comments + events + links
# ---------------------------------------------------------------------------
def test_task_detail_includes_links_and_events(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "child", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo" # parent not done yet
# Detail for the child shows the parent link.
r = client.get(f"/api/plugins/kanban/tasks/{child['id']}")
assert r.status_code == 200
data = r.json()
assert data["task"]["id"] == child["id"]
assert parent["id"] in data["links"]["parents"]
# Detail for the parent shows the child.
r = client.get(f"/api/plugins/kanban/tasks/{parent['id']}")
assert child["id"] in r.json()["links"]["children"]
# Events exist from creation.
assert len(data["events"]) >= 1
def test_task_detail_404_on_unknown(client):
r = client.get("/api/plugins/kanban/tasks/does-not-exist")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# PATCH /tasks/:id — status transitions
# ---------------------------------------------------------------------------
def test_patch_status_complete(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "done", "result": "shipped"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "done"
# Board reflects the move.
done = next(
c for c in client.get("/api/plugins/kanban/board").json()["columns"]
if c["name"] == "done"
)
assert any(x["id"] == t["id"] for x in done["tasks"])
def test_patch_block_then_unblock(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "blocked", "block_reason": "need input"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "blocked"
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_drag_drop_move_todo_to_ready(client):
"""Direct status write: the drag-drop path for statuses without a
dedicated verb (e.g. manually promoting todo -> ready)."""
parent = client.post("/api/plugins/kanban/tasks", json={"title": "p"}).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "c", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo"
r = client.patch(
f"/api/plugins/kanban/tasks/{child['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_reassign(client):
t = client.post(
"/api/plugins/kanban/tasks",
json={"title": "x", "assignee": "a"},
).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"assignee": "b"},
)
assert r.status_code == 200
assert r.json()["task"]["assignee"] == "b"
def test_patch_priority_and_edit(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"priority": 5, "title": "renamed"},
)
assert r.status_code == 200
data = r.json()["task"]
assert data["priority"] == 5
assert data["title"] == "renamed"
def test_patch_invalid_status(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "banana"},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Comments + Links
# ---------------------------------------------------------------------------
def test_add_comment(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": "how's progress?", "author": "teknium"},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{t['id']}")
comments = r.json()["comments"]
assert len(comments) == 1
assert comments[0]["body"] == "how's progress?"
assert comments[0]["author"] == "teknium"
def test_add_comment_empty_rejected(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": " "},
)
assert r.status_code == 400
def test_add_link_and_delete_link(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{b['id']}")
assert a["id"] in r.json()["links"]["parents"]
r = client.delete(
"/api/plugins/kanban/links",
params={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_add_link_cycle_rejected(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": b["id"], "child_id": a["id"]},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Dispatch nudge
# ---------------------------------------------------------------------------
def test_dispatch_dry_run(client):
client.post(
"/api/plugins/kanban/tasks",
json={"title": "work", "assignee": "researcher"},
)
r = client.post("/api/plugins/kanban/dispatch?dry_run=true&max=4")
assert r.status_code == 200
body = r.json()
# DispatchResult is serialized as a dataclass dict.
assert isinstance(body, dict)
# ---------------------------------------------------------------------------
# Triage column (new v1 status)
# ---------------------------------------------------------------------------
def test_create_triage_lands_in_triage_column(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "rough idea, spec me", "triage": True},
)
assert r.status_code == 200
task = r.json()["task"]
assert task["status"] == "triage"
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
assert len(triage["tasks"]) == 1
assert triage["tasks"][0]["title"] == "rough idea, spec me"
def test_triage_task_not_promoted_to_ready(client):
"""Triage tasks must stay in triage even when they have no parents."""
client.post(
"/api/plugins/kanban/tasks",
json={"title": "must stay put", "triage": True},
)
# Run the dispatcher — it should NOT promote the triage task.
client.post("/api/plugins/kanban/dispatch?dry_run=false&max=4")
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
ready = next(c for c in r.json()["columns"] if c["name"] == "ready")
assert len(triage["tasks"]) == 1
assert len(ready["tasks"]) == 0
def test_patch_status_triage_works(client):
"""A user (or specifier) can push a task back into triage, and out of it."""
t = client.post(
"/api/plugins/kanban/tasks", json={"title": "x"},
).json()["task"]
# Normal creation is 'ready'; push to triage.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "triage"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "triage"
# Now promote to todo.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "todo"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "todo"
# ---------------------------------------------------------------------------
# Progress rollup (done children / total children)
# ---------------------------------------------------------------------------
def test_board_progress_rollup(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child_a = client.post(
"/api/plugins/kanban/tasks",
json={"title": "a", "parents": [parent["id"]]},
).json()["task"]
child_b = client.post(
"/api/plugins/kanban/tasks",
json={"title": "b", "parents": [parent["id"]]},
).json()["task"]
# Children start as "todo" because the parent isn't done yet; promote
# them to "ready" so complete_task will accept the transition.
for cid in (child_a["id"], child_b["id"]):
r = client.patch(
f"/api/plugins/kanban/tasks/{cid}", json={"status": "ready"},
)
assert r.status_code == 200
# 0/2 done.
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 0, "total": 2}
# Complete one child. 1/2.
r = client.patch(
f"/api/plugins/kanban/tasks/{child_a['id']}",
json={"status": "done"},
)
assert r.status_code == 200
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 1, "total": 2}
# Childless tasks report progress=None, not {0/0}.
assert next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == child_b["id"]
)["progress"] is None
# ---------------------------------------------------------------------------
# Auto-init on first board read
# ---------------------------------------------------------------------------
def test_board_auto_initializes_missing_db(tmp_path, monkeypatch):
"""If kanban.db doesn't exist yet, GET /board must create it, not 500."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Deliberately DO NOT call kb.init_db().
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
r = c.get("/api/plugins/kanban/board")
assert r.status_code == 200
assert (home / "kanban.db").exists(), "init_db wasn't invoked by /board"
# ---------------------------------------------------------------------------
# WebSocket auth (query-param token)
# ---------------------------------------------------------------------------
def test_ws_events_rejects_when_token_required(tmp_path, monkeypatch):
"""When _SESSION_TOKEN is set (normal dashboard context), a missing or
wrong ?token= query param must be rejected with policy-violation."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
# Stub web_server so _check_ws_token has a token to compare against.
import types
stub = types.SimpleNamespace(_SESSION_TOKEN="secret-xyz")
monkeypatch.setitem(sys.modules, "hermes_cli.web_server", stub)
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
# No token → policy violation close.
from starlette.websockets import WebSocketDisconnect
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events"):
pass
assert exc.value.code == 1008
# Wrong token → policy violation close.
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events?token=nope"):
pass
assert exc.value.code == 1008
# Correct token → accepted (connect then close cleanly from our side).
with c.websocket_connect(
"/api/plugins/kanban/events?token=secret-xyz"
) as ws:
assert ws is not None # handshake succeeded
# ---------------------------------------------------------------------------
# Bulk actions
# ---------------------------------------------------------------------------
def test_bulk_status_ready(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
# Parent-less tasks land in "ready" already; push them to blocked first.
for tid in (a["id"], b["id"], c2["id"]):
client.patch(f"/api/plugins/kanban/tasks/{tid}",
json={"status": "blocked", "block_reason": "wait"})
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"], c2["id"]], "status": "ready"})
assert r.status_code == 200
results = r.json()["results"]
assert all(r["ok"] for r in results)
# All three are now ready.
board = client.get("/api/plugins/kanban/board").json()
ready = next(col for col in board["columns"] if col["name"] == "ready")
ids = {t["id"] for t in ready["tasks"]}
assert {a["id"], b["id"], c2["id"]}.issubset(ids)
def test_bulk_archive(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "archive": True})
assert r.status_code == 200
assert all(r["ok"] for r in r.json()["results"])
# Default board (archived hidden) — both gone.
board = client.get("/api/plugins/kanban/board").json()
ids = {t["id"] for col in board["columns"] for t in col["tasks"]}
assert a["id"] not in ids
assert b["id"] not in ids
def test_bulk_reassign(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "old"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks",
json={"title": "b", "assignee": "old"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "assignee": "new"})
assert r.status_code == 200
for tid in (a["id"], b["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["assignee"] == "new"
def test_bulk_unassign_via_empty_string(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "x"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"]], "assignee": ""})
assert r.status_code == 200
t = client.get(f"/api/plugins/kanban/tasks/{a['id']}").json()["task"]
assert t["assignee"] is None
def test_bulk_partial_failure_doesnt_abort_siblings(client):
"""One bad id in the middle of a batch must not prevent others from
applying."""
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], "bogus-id", c2["id"]], "priority": 7})
assert r.status_code == 200
results = r.json()["results"]
assert len(results) == 3
ok_ids = {r["id"] for r in results if r["ok"]}
assert a["id"] in ok_ids
assert c2["id"] in ok_ids
assert any(not r["ok"] and r["id"] == "bogus-id" for r in results)
# Good siblings actually got the priority bump.
for tid in (a["id"], c2["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["priority"] == 7
def test_bulk_empty_ids_400(client):
r = client.post("/api/plugins/kanban/tasks/bulk", json={"ids": []})
assert r.status_code == 400
# ---------------------------------------------------------------------------
# /config endpoint
# ---------------------------------------------------------------------------
def test_config_returns_defaults_when_section_missing(client):
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
# Defaults when dashboard.kanban is missing.
assert data["default_tenant"] == ""
assert data["lane_by_profile"] is True
assert data["include_archived_by_default"] is False
assert data["render_markdown"] is True
def test_config_reads_dashboard_kanban_section(tmp_path, monkeypatch, client):
home = Path(os.environ["HERMES_HOME"])
(home / "config.yaml").write_text(
"dashboard:\n"
" kanban:\n"
" default_tenant: acme\n"
" lane_by_profile: false\n"
" include_archived_by_default: true\n"
" render_markdown: false\n"
)
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
assert data["default_tenant"] == "acme"
assert data["lane_by_profile"] is False
assert data["include_archived_by_default"] is True
assert data["render_markdown"] is False
# ---------------------------------------------------------------------------
# Runs surfacing (vulcan-artivus RFC feedback)
# ---------------------------------------------------------------------------
def test_task_detail_includes_runs(client):
"""GET /tasks/:id carries a runs[] array with the attempt history."""
r = client.post("/api/plugins/kanban/tasks",
json={"title": "port x", "assignee": "worker"}).json()
tid = r["task"]["id"]
# Drive status running to force a run creation: PATCH to running
# doesn't call claim_task (the PATCH path uses _set_status_direct),
# so use the bulk/claim indirection via the kernel.
import hermes_cli.kanban_db as _kb
conn = _kb.connect()
try:
_kb.claim_task(conn, tid)
_kb.complete_task(
conn, tid,
result="done",
summary="tested on rate limiter",
metadata={"changed_files": ["limiter.py"]},
)
finally:
conn.close()
d = client.get(f"/api/plugins/kanban/tasks/{tid}").json()
assert "runs" in d
assert len(d["runs"]) == 1
run = d["runs"][0]
assert run["outcome"] == "completed"
assert run["profile"] == "worker"
assert run["summary"] == "tested on rate limiter"
assert run["metadata"] == {"changed_files": ["limiter.py"]}
assert run["ended_at"] is not None
def test_task_detail_runs_empty_before_claim(client):
"""A task that's never been claimed has an empty runs[] list, not
a missing key."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json()
d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json()
assert d["runs"] == []
def test_patch_status_done_with_summary_and_metadata(client):
"""PATCH /tasks/:id with status=done + summary + metadata must
reach complete_task, so the dashboard has CLI parity."""
# Create + claim.
r = client.post("/api/plugins/kanban/tasks", json={"title": "x", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={
"status": "done",
"summary": "shipped the thing",
"metadata": {"changed_files": ["a.py", "b.py"], "tests_run": 7},
},
)
assert r.status_code == 200, r.text
# The run must have the summary + metadata attached.
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "shipped the thing"
assert run.metadata == {"changed_files": ["a.py", "b.py"], "tests_run": 7}
finally:
conn.close()
def test_patch_status_done_without_summary_still_works(client):
"""Back-compat: PATCH without the new fields still completes."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "y", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "done", "result": "legacy shape"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "legacy shape" # falls back to result
finally:
conn.close()
def test_patch_status_archive_closes_running_run(client):
"""PATCH to archived while running must close the in-flight run."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "z", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
open_run = kb.latest_run(conn, tid)
assert open_run.ended_at is None
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "archived"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
task = kb.get_task(conn, tid)
assert task.status == "archived"
assert task.current_run_id is None
assert kb.latest_run(conn, tid).outcome == "reclaimed"
finally:
conn.close()
def test_event_dict_includes_run_id(client):
"""GET /tasks/:id returns events with run_id populated."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "e", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
run_id = kb.latest_run(conn, tid).id
kb.complete_task(conn, tid, summary="wss")
finally:
conn.close()
r = client.get(f"/api/plugins/kanban/tasks/{tid}")
assert r.status_code == 200
events = r.json()["events"]
# Every event in the response must have a run_id key (None or int).
for e in events:
assert "run_id" in e, f"missing run_id in event: {e}"
# completed event must have the actual run_id.
comp = [e for e in events if e["kind"] == "completed"]
assert comp[0]["run_id"] == run_id
# ---------------------------------------------------------------------------
# Per-task force-loaded skills via REST
# ---------------------------------------------------------------------------
def test_create_task_with_skills_roundtrips(client):
"""POST /tasks accepts `skills: [...]`, GET /tasks/:id returns it."""
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "translate docs",
"assignee": "linguist",
"skills": ["translation", "github-code-review"],
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["skills"] == ["translation", "github-code-review"]
# Fetch via GET /tasks/:id as the drawer does.
got = client.get(f"/api/plugins/kanban/tasks/{task['id']}").json()
assert got["task"]["skills"] == ["translation", "github-code-review"]
def test_create_task_without_skills_defaults_to_empty_list(client):
"""_task_dict serializes Task.skills=None as [] so the drawer can
always .length check without guarding against null."""
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "no skills", "assignee": "x"},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
# Task.skills is None in-memory; _task_dict serializes via
# dataclasses.asdict which keeps it None. The drawer's
# `t.skills && t.skills.length > 0` guard handles both null and [].
assert task.get("skills") in (None, [])
# ---------------------------------------------------------------------------
# Dispatcher-presence warning in POST /tasks response
# ---------------------------------------------------------------------------
def test_create_task_includes_warning_when_no_dispatcher(client, monkeypatch):
"""ready+assigned task + no gateway -> response has `warning` field
so the dashboard UI can surface a banner."""
# Force the dispatcher probe to report "not running".
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (False, "No gateway is running — start `hermes gateway start`."),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "warn-me", "assignee": "worker"},
)
assert r.status_code == 200
data = r.json()
assert data.get("warning")
assert "gateway" in data["warning"].lower()
def test_create_task_no_warning_when_dispatcher_up(client, monkeypatch):
"""Dispatcher running -> no `warning` field in the response."""
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (True, ""),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "silent", "assignee": "worker"},
)
assert r.status_code == 200
assert "warning" not in r.json() or not r.json()["warning"]
def test_create_task_no_warning_on_triage(client, monkeypatch):
"""Triage tasks never get the warning (they can't be dispatched
anyway until promoted)."""
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (False, "oh no"),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "triage-task", "assignee": "worker", "triage": True},
)
assert r.status_code == 200
assert "warning" not in r.json() or not r.json()["warning"]
def test_create_task_probe_error_does_not_break_create(client, monkeypatch):
"""Probe failure must never break task creation."""
def _raise():
raise RuntimeError("probe crashed")
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence", _raise,
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "resilient", "assignee": "worker"},
)
assert r.status_code == 200
assert r.json()["task"]["title"] == "resilient"