hermes-agent/tests/plugins/test_kanban_dashboard_plugin.py
Teknium f67063ba81
feat(kanban): generic diagnostics engine for task distress signals (#20332)
* feat(kanban): generic diagnostics engine for task distress signals

Replaces the hallucination-specific ``warnings`` / ``RecoverySection``
surface (shipped in PR #20232) with a reusable diagnostic-rule engine
that covers five distress kinds in v1 and can be extended without
touching UI code. The "something's wrong with this task" signal is
no longer limited to phantom card ids.

Closes the follow-up from #20232 discussion.

New module
----------
``hermes_cli/kanban_diagnostics.py`` — stateless, no-side-effect rule
engine. Each rule is a pure function of
``(task, events, runs, now, config) -> list[Diagnostic]``. Registry
is a simple list; adding a new distress kind is one function + one
import, no UI or API changes required.

v1 rule set
-----------
* ``hallucinated_cards`` (error) — folds the existing
  ``completion_blocked_hallucination`` event into the new surface.
* ``prose_phantom_refs`` (warning) — folds
  ``suspected_hallucinated_references``.
* ``repeated_spawn_failures`` (error → critical at 2x threshold) —
  fires when ``tasks.spawn_failures >= 3``; suggests
  ``hermes -p <profile> doctor`` / ``auth``.
* ``repeated_crashes`` (error → critical) — fires after N consecutive
  ``crashed`` run outcomes with no successful completion between;
  suggests ``hermes kanban log <id>``.
* ``stuck_in_blocked`` (warning) — fires after 24h in ``blocked``
  state with no comments / unblock attempts; suggests commenting.

Every diagnostic carries structured ``actions`` (reclaim, reassign,
unblock, cli_hint, comment, open_docs) that render consistently in
both CLI and dashboard. Suggested actions are highlighted; generic
recovery actions (reclaim / reassign) are available on every kind as
fallbacks.

Diagnostics auto-clear when the underlying failure resolves — a
clean ``completed``/``edited`` event drops hallucination diagnostics,
a successful run drops crash diagnostics, a comment drops
stuck-blocked diagnostics. Audit events persist; the badge goes away.

API
---
``plugin_api.py``:
* ``/board`` now attaches ``diagnostics`` (full list) and
  ``warnings`` (compact summary with ``highest_severity``) per task.
* ``/tasks/{id}`` attaches diagnostics so the drawer's Diagnostics
  section auto-opens on flagged tasks.
* NEW ``/diagnostics`` endpoint — fleet-wide listing, filterable by
  severity, sorted critical-first.

CLI
---
* NEW ``hermes kanban diagnostics [--severity X] [--task id]
  [--json]`` — fleet view or single-task view, matches dashboard rule
  output so CLI users see the same picture.
* ``hermes kanban show <id>`` now renders a Diagnostics section near
  the top with severity markers + suggested actions.

Dashboard
---------
* Card badge is severity-coloured (⚠ amber warning, !! orange error,
  !!! red critical) using ``warnings.highest_severity``.
* Attention strip above the toolbar counts EVERY task with active
  diagnostics (not just hallucinations), severity-coloured, lists
  affected tasks with Open buttons when expanded.
* Drawer's old ``RecoverySection`` replaced with generic
  ``DiagnosticsSection`` rendering a card per active diagnostic:
  title + detail + structured data (task-id chips when payload keys
  look like id lists) + action buttons. Reassign profile picker is
  inline per-diagnostic. Clipboard fallback uses ``.catch()`` for
  environments where writeText rejects.
* Three-rung severity palette; amber for warning, orange for error,
  red for critical. Uses CSS variables so theming is straightforward.

Tests
-----
* NEW ``tests/hermes_cli/test_kanban_diagnostics.py`` — 14 unit tests
  covering each rule's positive/negative/threshold paths, severity
  sorting, broken-rule isolation, and sqlite3.Row integration.
* Dashboard plugin tests extended: ``/diagnostics`` endpoint (empty,
  populated, severity-filtered), ``/board`` exposes both diagnostic
  list and compact summary with ``highest_severity``.
* Existing hallucination-specific test (``test_board_surfaces_
  warnings_field_for_hallucinated_completions``) updated to reflect
  the new contract: warning summary keys by diagnostic kind
  (``hallucinated_cards``) not event kind.

379 kanban-suite tests pass (+16 net from this PR).

Live verification
-----------------
Seeded all 5 diagnostic kinds + one clean + one plain-running task
(7 total) into an isolated HERMES_HOME, spun up the dashboard, and
verified:

* Attention strip: shows ``!! 5 tasks need attention`` in the
  error-severity orange; Show expands to a list of 5 rows ordered
  critical > error > warning.
* Card badges: error tasks render ``!!`` orange, warning tasks
  render ``⚠`` amber, clean and plain-running tasks render no badge.
* Each of the 5 rules opens a correctly-coloured, correctly-styled
  diagnostic card in the drawer with its specific suggested action.
* Live reassign from a diagnostic card flipped
  ``broken-ml-worker → alice`` and the drawer refreshed with the
  new assignee + the same diagnostic still firing (correct:
  spawn_failures counter hasn't reset yet).
* CLI ``hermes kanban diagnostics`` prints all 5 in severity order;
  ``--severity error`` narrows to 3; ``kanban show <id>`` includes
  the Diagnostics block at the top with suggested action hint.

Migration note
--------------
The old ``warnings`` shape (``{count, kinds, latest_at}``) is
preserved on the API but ``kinds`` now keys by diagnostic kind
(``hallucinated_cards``) instead of event kind
(``completion_blocked_hallucination``). ``highest_severity`` is a
new required field. The dashboard was the only consumer and has
been updated in the same commit; external API consumers of the
``warnings`` field will need to update their kind-match logic.

* feat(kanban/diagnostics): lead titles with the actual error text

The generic 'Worker crashed N runs in a row' / 'Worker failed to spawn
N times' titles buried the actual cause in the data section. Operators
had to open logs or expand the diagnostic to see WHY the worker is
stuck — rate-limit vs insufficient quota vs bad auth vs context
overflow vs network blip all looked identical at a glance.

New titles:

  Agent crashed 3x: openai: 429 Too Many Requests - rate limit reached
  Agent crashed 3x: anthropic: 402 insufficient_quota - credit balance
  Agent crashed 3x: provider auth error: 401 Unauthorized
  Agent spawn failed 4x: insufficient_quota: You exceeded your current

Detail keeps the full error snippet (capped at 500 chars + ellipsis
for tracebacks). Title takes the first line capped at 160 chars.
Fallback title if no error recorded stays honest ('no error recorded').

Tests: 4 new cases covering 429/billing/spawn/truncation. 383 total
pass (+4).

Live-verified on dashboard with 6 seeded scenarios
(rate-limit, billing, auth, context, network, spawn-billing) —
each card title leads with the actionable error text.
2026-05-05 13:32:42 -07:00

1442 lines
52 KiB
Python

"""Tests for the Kanban dashboard plugin backend (plugins/kanban/dashboard/plugin_api.py).
The plugin mounts as /api/plugins/kanban/ inside the dashboard's FastAPI app,
but here we attach its router to a bare FastAPI instance so we can test the
REST surface without spinning up the whole dashboard.
"""
from __future__ import annotations
import importlib.util
import os
import sys
import time
from pathlib import Path
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from hermes_cli import kanban_db as kb
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _load_plugin_router():
"""Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router."""
repo_root = Path(__file__).resolve().parents[2]
plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py"
assert plugin_file.exists(), f"plugin file missing: {plugin_file}"
spec = importlib.util.spec_from_file_location(
"hermes_dashboard_plugin_kanban_test", plugin_file,
)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod.router
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.fixture
def client(kanban_home):
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
return TestClient(app)
# ---------------------------------------------------------------------------
# GET /board on an empty DB
# ---------------------------------------------------------------------------
def test_board_empty(client):
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
# All canonical columns present (triage + the rest), each empty.
names = [c["name"] for c in data["columns"]]
for expected in ("triage", "todo", "ready", "running", "blocked", "done"):
assert expected in names, f"missing column {expected}: {names}"
assert all(len(c["tasks"]) == 0 for c in data["columns"])
assert data["tenants"] == []
assert data["assignees"] == []
assert data["latest_event_id"] == 0
# ---------------------------------------------------------------------------
# POST /tasks then GET /board sees it
# ---------------------------------------------------------------------------
def test_create_task_appears_on_board(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "Research LLM caching",
"assignee": "researcher",
"priority": 3,
"tenant": "acme",
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["title"] == "Research LLM caching"
assert task["assignee"] == "researcher"
assert task["status"] == "ready" # no parents -> immediately ready
assert task["priority"] == 3
assert task["tenant"] == "acme"
task_id = task["id"]
# Board now lists it under 'ready'.
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
ready = next(c for c in data["columns"] if c["name"] == "ready")
assert len(ready["tasks"]) == 1
assert ready["tasks"][0]["id"] == task_id
assert "acme" in data["tenants"]
assert "researcher" in data["assignees"]
def test_tenant_filter(client):
client.post("/api/plugins/kanban/tasks", json={"title": "A", "tenant": "t1"})
client.post("/api/plugins/kanban/tasks", json={"title": "B", "tenant": "t2"})
r = client.get("/api/plugins/kanban/board?tenant=t1")
counts = {c["name"]: len(c["tasks"]) for c in r.json()["columns"]}
total = sum(counts.values())
assert total == 1
r = client.get("/api/plugins/kanban/board?tenant=t2")
total = sum(len(c["tasks"]) for c in r.json()["columns"])
assert total == 1
# ---------------------------------------------------------------------------
# GET /tasks/:id returns body + comments + events + links
# ---------------------------------------------------------------------------
def test_task_detail_includes_links_and_events(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "child", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo" # parent not done yet
# Detail for the child shows the parent link.
r = client.get(f"/api/plugins/kanban/tasks/{child['id']}")
assert r.status_code == 200
data = r.json()
assert data["task"]["id"] == child["id"]
assert parent["id"] in data["links"]["parents"]
# Detail for the parent shows the child.
r = client.get(f"/api/plugins/kanban/tasks/{parent['id']}")
assert child["id"] in r.json()["links"]["children"]
# Events exist from creation.
assert len(data["events"]) >= 1
def test_task_detail_404_on_unknown(client):
r = client.get("/api/plugins/kanban/tasks/does-not-exist")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# PATCH /tasks/:id — status transitions
# ---------------------------------------------------------------------------
def test_patch_status_complete(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "done", "result": "shipped"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "done"
# Board reflects the move.
done = next(
c for c in client.get("/api/plugins/kanban/board").json()["columns"]
if c["name"] == "done"
)
assert any(x["id"] == t["id"] for x in done["tasks"])
def test_patch_block_then_unblock(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "blocked", "block_reason": "need input"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "blocked"
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_drag_drop_move_todo_to_ready(client):
"""Direct status write: the drag-drop path for statuses without a
dedicated verb (e.g. manually promoting todo -> ready)."""
parent = client.post("/api/plugins/kanban/tasks", json={"title": "p"}).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "c", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo"
r = client.patch(
f"/api/plugins/kanban/tasks/{child['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_reassign(client):
t = client.post(
"/api/plugins/kanban/tasks",
json={"title": "x", "assignee": "a"},
).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"assignee": "b"},
)
assert r.status_code == 200
assert r.json()["task"]["assignee"] == "b"
def test_patch_priority_and_edit(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"priority": 5, "title": "renamed"},
)
assert r.status_code == 200
data = r.json()["task"]
assert data["priority"] == 5
assert data["title"] == "renamed"
def test_patch_invalid_status(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "banana"},
)
assert r.status_code == 400
def test_patch_status_running_rejected(client):
"""Dashboard PATCH cannot transition a task directly to 'running'.
The only legitimate path into 'running' is through the dispatcher's
``claim_task`` — which atomically creates a ``task_runs`` row,
claim_lock, expiry, and worker-PID metadata. Allowing a direct set
creates orphaned 'running' tasks with no run row or claim, which
violate the board's run-history invariants. See issue #19535.
"""
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "running"},
)
assert r.status_code == 400
assert "running" in r.json()["detail"]
# Task's status should still be its pre-request value — the direct-set
# was rejected before any mutation.
board = client.get("/api/plugins/kanban/board").json()
statuses = {
tt["id"]: col["name"]
for col in board["columns"]
for tt in col["tasks"]
}
assert statuses.get(t["id"]) != "running"
# ---------------------------------------------------------------------------
# Comments + Links
# ---------------------------------------------------------------------------
def test_add_comment(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": "how's progress?", "author": "teknium"},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{t['id']}")
comments = r.json()["comments"]
assert len(comments) == 1
assert comments[0]["body"] == "how's progress?"
assert comments[0]["author"] == "teknium"
def test_add_comment_empty_rejected(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": " "},
)
assert r.status_code == 400
def test_add_link_and_delete_link(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{b['id']}")
assert a["id"] in r.json()["links"]["parents"]
r = client.delete(
"/api/plugins/kanban/links",
params={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_add_link_cycle_rejected(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": b["id"], "child_id": a["id"]},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Dispatch nudge
# ---------------------------------------------------------------------------
def test_dispatch_dry_run(client):
client.post(
"/api/plugins/kanban/tasks",
json={"title": "work", "assignee": "researcher"},
)
r = client.post("/api/plugins/kanban/dispatch?dry_run=true&max=4")
assert r.status_code == 200
body = r.json()
# DispatchResult is serialized as a dataclass dict.
assert isinstance(body, dict)
# ---------------------------------------------------------------------------
# Triage column (new v1 status)
# ---------------------------------------------------------------------------
def test_create_triage_lands_in_triage_column(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "rough idea, spec me", "triage": True},
)
assert r.status_code == 200
task = r.json()["task"]
assert task["status"] == "triage"
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
assert len(triage["tasks"]) == 1
assert triage["tasks"][0]["title"] == "rough idea, spec me"
def test_triage_task_not_promoted_to_ready(client):
"""Triage tasks must stay in triage even when they have no parents."""
client.post(
"/api/plugins/kanban/tasks",
json={"title": "must stay put", "triage": True},
)
# Run the dispatcher — it should NOT promote the triage task.
client.post("/api/plugins/kanban/dispatch?dry_run=false&max=4")
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
ready = next(c for c in r.json()["columns"] if c["name"] == "ready")
assert len(triage["tasks"]) == 1
assert len(ready["tasks"]) == 0
def test_patch_status_triage_works(client):
"""A user (or specifier) can push a task back into triage, and out of it."""
t = client.post(
"/api/plugins/kanban/tasks", json={"title": "x"},
).json()["task"]
# Normal creation is 'ready'; push to triage.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "triage"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "triage"
# Now promote to todo.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "todo"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "todo"
# ---------------------------------------------------------------------------
# Progress rollup (done children / total children)
# ---------------------------------------------------------------------------
def test_board_progress_rollup(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child_a = client.post(
"/api/plugins/kanban/tasks",
json={"title": "a", "parents": [parent["id"]]},
).json()["task"]
child_b = client.post(
"/api/plugins/kanban/tasks",
json={"title": "b", "parents": [parent["id"]]},
).json()["task"]
# Children start as "todo" because the parent isn't done yet; promote
# them to "ready" so complete_task will accept the transition.
for cid in (child_a["id"], child_b["id"]):
r = client.patch(
f"/api/plugins/kanban/tasks/{cid}", json={"status": "ready"},
)
assert r.status_code == 200
# 0/2 done.
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 0, "total": 2}
# Complete one child. 1/2.
r = client.patch(
f"/api/plugins/kanban/tasks/{child_a['id']}",
json={"status": "done"},
)
assert r.status_code == 200
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 1, "total": 2}
# Childless tasks report progress=None, not {0/0}.
assert next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == child_b["id"]
)["progress"] is None
# ---------------------------------------------------------------------------
# Auto-init on first board read
# ---------------------------------------------------------------------------
def test_board_auto_initializes_missing_db(tmp_path, monkeypatch):
"""If kanban.db doesn't exist yet, GET /board must create it, not 500."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Deliberately DO NOT call kb.init_db().
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
r = c.get("/api/plugins/kanban/board")
assert r.status_code == 200
assert (home / "kanban.db").exists(), "init_db wasn't invoked by /board"
# ---------------------------------------------------------------------------
# WebSocket auth (query-param token)
# ---------------------------------------------------------------------------
def test_ws_events_rejects_when_token_required(tmp_path, monkeypatch):
"""When _SESSION_TOKEN is set (normal dashboard context), a missing or
wrong ?token= query param must be rejected with policy-violation."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
# Stub web_server so _check_ws_token has a token to compare against.
import hermes_cli
import types
stub = types.SimpleNamespace(_SESSION_TOKEN="secret-xyz")
monkeypatch.setitem(sys.modules, "hermes_cli.web_server", stub)
monkeypatch.setattr(hermes_cli, "web_server", stub, raising=False)
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
# No token → policy violation close.
from starlette.websockets import WebSocketDisconnect
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events"):
pass
assert exc.value.code == 1008
# Wrong token → policy violation close.
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events?token=nope"):
pass
assert exc.value.code == 1008
# Correct token → accepted (connect then close cleanly from our side).
with c.websocket_connect(
"/api/plugins/kanban/events?token=secret-xyz"
) as ws:
assert ws is not None # handshake succeeded
# ---------------------------------------------------------------------------
# Bulk actions
# ---------------------------------------------------------------------------
def test_bulk_status_ready(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
# Parent-less tasks land in "ready" already; push them to blocked first.
for tid in (a["id"], b["id"], c2["id"]):
client.patch(f"/api/plugins/kanban/tasks/{tid}",
json={"status": "blocked", "block_reason": "wait"})
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"], c2["id"]], "status": "ready"})
assert r.status_code == 200
results = r.json()["results"]
assert all(r["ok"] for r in results)
# All three are now ready.
board = client.get("/api/plugins/kanban/board").json()
ready = next(col for col in board["columns"] if col["name"] == "ready")
ids = {t["id"] for t in ready["tasks"]}
assert {a["id"], b["id"], c2["id"]}.issubset(ids)
def test_bulk_status_done_forwards_completion_summary(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post(
"/api/plugins/kanban/tasks/bulk",
json={
"ids": [a["id"], b["id"]],
"status": "done",
"result": "DECIDED: ship it",
"summary": "DECIDED: ship it",
"metadata": {"source": "dashboard"},
},
)
assert r.status_code == 200
assert all(r["ok"] for r in r.json()["results"])
conn = kb.connect()
try:
for tid in (a["id"], b["id"]):
task = kb.get_task(conn, tid)
run = kb.latest_run(conn, tid)
assert task.status == "done"
assert task.result == "DECIDED: ship it"
assert run.summary == "DECIDED: ship it"
assert run.metadata == {"source": "dashboard"}
finally:
conn.close()
def test_dashboard_done_actions_prompt_for_completion_summary():
repo_root = Path(__file__).resolve().parents[2]
bundle = (
repo_root / "plugins" / "kanban" / "dashboard" / "dist" / "index.js"
).read_text()
assert "withCompletionSummary" in bundle
assert "Completion summary" in bundle
assert "result: summary" in bundle
assert "body: JSON.stringify(patch)" in bundle
assert "body: JSON.stringify(finalPatch)" in bundle
def test_bulk_archive(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "archive": True})
assert r.status_code == 200
assert all(r["ok"] for r in r.json()["results"])
# Default board (archived hidden) — both gone.
board = client.get("/api/plugins/kanban/board").json()
ids = {t["id"] for col in board["columns"] for t in col["tasks"]}
assert a["id"] not in ids
assert b["id"] not in ids
def test_bulk_reassign(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "old"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks",
json={"title": "b", "assignee": "old"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "assignee": "new"})
assert r.status_code == 200
for tid in (a["id"], b["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["assignee"] == "new"
def test_bulk_unassign_via_empty_string(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "x"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"]], "assignee": ""})
assert r.status_code == 200
t = client.get(f"/api/plugins/kanban/tasks/{a['id']}").json()["task"]
assert t["assignee"] is None
def test_bulk_partial_failure_doesnt_abort_siblings(client):
"""One bad id in the middle of a batch must not prevent others from
applying."""
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], "bogus-id", c2["id"]], "priority": 7})
assert r.status_code == 200
results = r.json()["results"]
assert len(results) == 3
ok_ids = {r["id"] for r in results if r["ok"]}
assert a["id"] in ok_ids
assert c2["id"] in ok_ids
assert any(not r["ok"] and r["id"] == "bogus-id" for r in results)
# Good siblings actually got the priority bump.
for tid in (a["id"], c2["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["priority"] == 7
def test_bulk_empty_ids_400(client):
r = client.post("/api/plugins/kanban/tasks/bulk", json={"ids": []})
assert r.status_code == 400
# ---------------------------------------------------------------------------
# /config endpoint
# ---------------------------------------------------------------------------
def test_config_returns_defaults_when_section_missing(client):
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
# Defaults when dashboard.kanban is missing.
assert data["default_tenant"] == ""
assert data["lane_by_profile"] is True
assert data["include_archived_by_default"] is False
assert data["render_markdown"] is True
def test_config_reads_dashboard_kanban_section(tmp_path, monkeypatch, client):
home = Path(os.environ["HERMES_HOME"])
(home / "config.yaml").write_text(
"dashboard:\n"
" kanban:\n"
" default_tenant: acme\n"
" lane_by_profile: false\n"
" include_archived_by_default: true\n"
" render_markdown: false\n"
)
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
assert data["default_tenant"] == "acme"
assert data["lane_by_profile"] is False
assert data["include_archived_by_default"] is True
assert data["render_markdown"] is False
# ---------------------------------------------------------------------------
# Runs surfacing (vulcan-artivus RFC feedback)
# ---------------------------------------------------------------------------
def test_task_detail_includes_runs(client):
"""GET /tasks/:id carries a runs[] array with the attempt history."""
r = client.post("/api/plugins/kanban/tasks",
json={"title": "port x", "assignee": "worker"}).json()
tid = r["task"]["id"]
# Drive status running to force a run creation: PATCH to running
# doesn't call claim_task (the PATCH path uses _set_status_direct),
# so use the bulk/claim indirection via the kernel.
import hermes_cli.kanban_db as _kb
conn = _kb.connect()
try:
_kb.claim_task(conn, tid)
_kb.complete_task(
conn, tid,
result="done",
summary="tested on rate limiter",
metadata={"changed_files": ["limiter.py"]},
)
finally:
conn.close()
d = client.get(f"/api/plugins/kanban/tasks/{tid}").json()
assert "runs" in d
assert len(d["runs"]) == 1
run = d["runs"][0]
assert run["outcome"] == "completed"
assert run["profile"] == "worker"
assert run["summary"] == "tested on rate limiter"
assert run["metadata"] == {"changed_files": ["limiter.py"]}
assert run["ended_at"] is not None
def test_task_detail_runs_empty_before_claim(client):
"""A task that's never been claimed has an empty runs[] list, not
a missing key."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json()
d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json()
assert d["runs"] == []
def test_patch_status_done_with_summary_and_metadata(client):
"""PATCH /tasks/:id with status=done + summary + metadata must
reach complete_task, so the dashboard has CLI parity."""
# Create + claim.
r = client.post("/api/plugins/kanban/tasks", json={"title": "x", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={
"status": "done",
"summary": "shipped the thing",
"metadata": {"changed_files": ["a.py", "b.py"], "tests_run": 7},
},
)
assert r.status_code == 200, r.text
# The run must have the summary + metadata attached.
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "shipped the thing"
assert run.metadata == {"changed_files": ["a.py", "b.py"], "tests_run": 7}
finally:
conn.close()
def test_patch_status_done_without_summary_still_works(client):
"""Back-compat: PATCH without the new fields still completes."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "y", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "done", "result": "legacy shape"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "legacy shape" # falls back to result
finally:
conn.close()
def test_patch_status_archive_closes_running_run(client):
"""PATCH to archived while running must close the in-flight run."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "z", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
open_run = kb.latest_run(conn, tid)
assert open_run.ended_at is None
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "archived"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
task = kb.get_task(conn, tid)
assert task.status == "archived"
assert task.current_run_id is None
assert kb.latest_run(conn, tid).outcome == "reclaimed"
finally:
conn.close()
def test_event_dict_includes_run_id(client):
"""GET /tasks/:id returns events with run_id populated."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "e", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
run_id = kb.latest_run(conn, tid).id
kb.complete_task(conn, tid, summary="wss")
finally:
conn.close()
r = client.get(f"/api/plugins/kanban/tasks/{tid}")
assert r.status_code == 200
events = r.json()["events"]
# Every event in the response must have a run_id key (None or int).
for e in events:
assert "run_id" in e, f"missing run_id in event: {e}"
# completed event must have the actual run_id.
comp = [e for e in events if e["kind"] == "completed"]
assert comp[0]["run_id"] == run_id
# ---------------------------------------------------------------------------
# Per-task force-loaded skills via REST
# ---------------------------------------------------------------------------
def test_create_task_with_skills_roundtrips(client):
"""POST /tasks accepts `skills: [...]`, GET /tasks/:id returns it."""
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "translate docs",
"assignee": "linguist",
"skills": ["translation", "github-code-review"],
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["skills"] == ["translation", "github-code-review"]
# Fetch via GET /tasks/:id as the drawer does.
got = client.get(f"/api/plugins/kanban/tasks/{task['id']}").json()
assert got["task"]["skills"] == ["translation", "github-code-review"]
def test_create_task_without_skills_defaults_to_empty_list(client):
"""_task_dict serializes Task.skills=None as [] so the drawer can
always .length check without guarding against null."""
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "no skills", "assignee": "x"},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
# Task.skills is None in-memory; _task_dict serializes via
# dataclasses.asdict which keeps it None. The drawer's
# `t.skills && t.skills.length > 0` guard handles both null and [].
assert task.get("skills") in (None, [])
# ---------------------------------------------------------------------------
# Dispatcher-presence warning in POST /tasks response
# ---------------------------------------------------------------------------
def test_create_task_includes_warning_when_no_dispatcher(client, monkeypatch):
"""ready+assigned task + no gateway -> response has `warning` field
so the dashboard UI can surface a banner."""
# Force the dispatcher probe to report "not running".
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (False, "No gateway is running — start `hermes gateway start`."),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "warn-me", "assignee": "worker"},
)
assert r.status_code == 200
data = r.json()
assert data.get("warning")
assert "gateway" in data["warning"].lower()
def test_create_task_no_warning_when_dispatcher_up(client, monkeypatch):
"""Dispatcher running -> no `warning` field in the response."""
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (True, ""),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "silent", "assignee": "worker"},
)
assert r.status_code == 200
assert "warning" not in r.json() or not r.json()["warning"]
def test_create_task_no_warning_on_triage(client, monkeypatch):
"""Triage tasks never get the warning (they can't be dispatched
anyway until promoted)."""
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence",
lambda: (False, "oh no"),
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "triage-task", "assignee": "worker", "triage": True},
)
assert r.status_code == 200
assert "warning" not in r.json() or not r.json()["warning"]
def test_create_task_probe_error_does_not_break_create(client, monkeypatch):
"""Probe failure must never break task creation."""
def _raise():
raise RuntimeError("probe crashed")
monkeypatch.setattr(
"hermes_cli.kanban._check_dispatcher_presence", _raise,
)
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "resilient", "assignee": "worker"},
)
assert r.status_code == 200
assert r.json()["task"]["title"] == "resilient"
# ---------------------------------------------------------------------------
# Home-channel subscription endpoints (#19534 follow-up: GUI opt-in)
# ---------------------------------------------------------------------------
#
# Dashboard surface for per-task, per-platform notification toggles. The
# backend endpoints read the live GatewayConfig, so tests set env vars
# (BOT_TOKEN + HOME_CHANNEL) to simulate a user who has run /sethome on
# telegram and discord.
@pytest.fixture
def with_home_channels(monkeypatch):
"""Simulate a user with home channels set on telegram and discord."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc:fake")
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "1234567")
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL_THREAD_ID", "42")
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL_NAME", "Main TG")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "disc_fake")
monkeypatch.setenv("DISCORD_HOME_CHANNEL", "9999999")
monkeypatch.setenv("DISCORD_HOME_CHANNEL_NAME", "Main Discord")
# Slack has a token but NO home — should be excluded from the list.
monkeypatch.setenv("SLACK_BOT_TOKEN", "slack_fake")
def test_home_channels_lists_only_platforms_with_home(client, with_home_channels):
"""GET /home-channels returns entries only for platforms where the
user has set a home; untoggled-subscribed bool is false by default."""
r = client.get("/api/plugins/kanban/home-channels")
assert r.status_code == 200
platforms = {h["platform"] for h in r.json()["home_channels"]}
assert platforms == {"telegram", "discord"}, (
f"slack has a token but no home — must not appear. got {platforms}"
)
for h in r.json()["home_channels"]:
assert h["subscribed"] is False
def test_home_channels_no_task_id_all_unsubscribed(client, with_home_channels):
"""Without task_id, every entry's subscribed=false (UI "no task" state)."""
r = client.get("/api/plugins/kanban/home-channels")
assert r.status_code == 200
assert all(not h["subscribed"] for h in r.json()["home_channels"])
def test_home_subscribe_creates_notify_sub_row(client, with_home_channels):
"""POST .../home-subscribe/telegram writes a kanban_notify_subs row
keyed to the telegram home's (chat_id, thread_id)."""
from hermes_cli import kanban_db as kb
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
assert r.status_code == 200
assert r.json()["ok"] is True
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, t["id"])
finally:
conn.close()
assert len(subs) == 1
assert subs[0]["platform"] == "telegram"
assert subs[0]["chat_id"] == "1234567"
assert subs[0]["thread_id"] == "42"
def test_home_subscribe_flips_subscribed_flag_in_subsequent_get(client, with_home_channels):
"""After subscribe, the GET endpoint reports subscribed=true for that
platform and false for the others."""
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
r = client.get(f"/api/plugins/kanban/home-channels?task_id={t['id']}")
flags = {h["platform"]: h["subscribed"] for h in r.json()["home_channels"]}
assert flags == {"telegram": True, "discord": False}
def test_home_subscribe_is_idempotent(client, with_home_channels):
"""Re-subscribing keeps a single row at the DB layer."""
from hermes_cli import kanban_db as kb
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
conn = kb.connect()
try:
assert len(kb.list_notify_subs(conn, t["id"])) == 1
finally:
conn.close()
def test_home_subscribe_unknown_platform_returns_404(client, with_home_channels):
"""Platforms without a home configured (slack in the fixture) return 404."""
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/slack")
assert r.status_code == 404
assert "slack" in r.json()["detail"]
def test_home_subscribe_unknown_task_returns_404(client, with_home_channels):
r = client.post("/api/plugins/kanban/tasks/t_nonexistent/home-subscribe/telegram")
assert r.status_code == 404
def test_home_unsubscribe_removes_notify_sub_row(client, with_home_channels):
"""DELETE .../home-subscribe/telegram removes the matching row."""
from hermes_cli import kanban_db as kb
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
r = client.delete(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
assert r.status_code == 200
conn = kb.connect()
try:
assert kb.list_notify_subs(conn, t["id"]) == []
finally:
conn.close()
def test_home_subscribe_multiple_platforms_independent(client, with_home_channels):
"""Subscribing on telegram does not affect discord and vice versa."""
from hermes_cli import kanban_db as kb
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
client.post(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/discord")
conn = kb.connect()
try:
subs = {s["platform"]: s for s in kb.list_notify_subs(conn, t["id"])}
finally:
conn.close()
assert set(subs) == {"telegram", "discord"}
# Unsubscribe telegram only.
client.delete(f"/api/plugins/kanban/tasks/{t['id']}/home-subscribe/telegram")
conn = kb.connect()
try:
subs = {s["platform"]: s for s in kb.list_notify_subs(conn, t["id"])}
finally:
conn.close()
assert set(subs) == {"discord"}
def test_home_channels_empty_when_no_homes_configured(client, monkeypatch):
"""Zero platforms with a home -> empty list (UI hides the section)."""
# No BOT_TOKEN env vars set → load_gateway_config().platforms is empty.
for var in [
"TELEGRAM_BOT_TOKEN", "TELEGRAM_HOME_CHANNEL",
"DISCORD_BOT_TOKEN", "DISCORD_HOME_CHANNEL",
"SLACK_BOT_TOKEN",
]:
monkeypatch.delenv(var, raising=False)
r = client.get("/api/plugins/kanban/home-channels")
assert r.status_code == 200
assert r.json()["home_channels"] == []
# ---------------------------------------------------------------------------
# Recovery endpoints (reclaim + reassign) and warnings field
# ---------------------------------------------------------------------------
def test_board_surfaces_warnings_field_for_hallucinated_completions(client):
"""Tasks with a pending completion_blocked_hallucination event surface
a ``warnings`` object on the /board payload so the UI can badge
them without fetching per-task events. The warnings summary is
keyed by diagnostic kind (``hallucinated_cards``) rather than the
raw event kind — see hermes_cli.kanban_diagnostics for the rule
that produces it.
"""
conn = kb.connect()
try:
parent = kb.create_task(conn, title="parent", assignee="alice")
real = kb.create_task(conn, title="real", assignee="x", created_by="alice")
import pytest as _pytest
with _pytest.raises(kb.HallucinatedCardsError):
kb.complete_task(
conn, parent,
summary="claimed phantom",
created_cards=[real, "t_deadbeefcafe"],
)
finally:
conn.close()
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
tasks = [t for col in data["columns"] for t in col["tasks"]]
parent_dict = next(t for t in tasks if t["title"] == "parent")
assert parent_dict.get("warnings") is not None
w = parent_dict["warnings"]
assert w["count"] >= 1
assert "hallucinated_cards" in w["kinds"]
assert w["highest_severity"] == "error"
# Full diagnostic list also on the payload for drawer rendering.
assert parent_dict.get("diagnostics") is not None
assert parent_dict["diagnostics"][0]["kind"] == "hallucinated_cards"
assert "t_deadbeefcafe" in parent_dict["diagnostics"][0]["data"]["phantom_ids"]
def test_board_warnings_cleared_after_clean_completion(client):
"""A completed or edited event after a hallucination event clears
the warning badge — we don't mark tasks permanently."""
conn = kb.connect()
try:
parent = kb.create_task(conn, title="parent", assignee="alice")
real = kb.create_task(conn, title="real", assignee="x", created_by="alice")
import pytest as _pytest
with _pytest.raises(kb.HallucinatedCardsError):
kb.complete_task(
conn, parent,
summary="first attempt phantom",
created_cards=[real, "t_phantom11"],
)
# Second attempt drops the bad id — succeeds.
ok = kb.complete_task(
conn, parent,
summary="retry without phantom",
created_cards=[real],
)
assert ok is True
finally:
conn.close()
r = client.get("/api/plugins/kanban/board", params={"include_archived": True})
assert r.status_code == 200
data = r.json()
tasks = [t for col in data["columns"] for t in col["tasks"]]
parent_dict = next(t for t in tasks if t["title"] == "parent")
# The clean completion wiped the warning.
assert parent_dict.get("warnings") is None
def test_reclaim_endpoint_releases_running_claim(client):
"""POST /tasks/<id>/reclaim drops the claim, returns ok, and emits
a manual reclaimed event."""
import secrets
conn = kb.connect()
try:
t = kb.create_task(conn, title="running", assignee="x")
lock = secrets.token_hex(8)
future = int(time.time()) + 3600
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, claim_expires=?, "
"worker_pid=? WHERE id=?",
(lock, future, 99999, t),
)
conn.execute(
"INSERT INTO task_runs (task_id, status, claim_lock, claim_expires, "
"worker_pid, started_at) VALUES (?, 'running', ?, ?, ?, ?)",
(t, lock, future, 99999, int(time.time())),
)
run_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute("UPDATE tasks SET current_run_id=? WHERE id=?", (run_id, t))
conn.commit()
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/tasks/{t}/reclaim",
json={"reason": "browser recovery"},
)
assert r.status_code == 200, r.text
body = r.json()
assert body["ok"] is True
assert body["task_id"] == t
# Confirm the task is back to ready.
conn2 = kb.connect()
try:
row = conn2.execute(
"SELECT status, claim_lock FROM tasks WHERE id=?", (t,),
).fetchone()
assert row["status"] == "ready"
assert row["claim_lock"] is None
finally:
conn2.close()
def test_reclaim_endpoint_409_for_non_running_task(client):
"""Reclaiming a task that's already ready returns 409."""
conn = kb.connect()
try:
t = kb.create_task(conn, title="ready", assignee="x")
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/tasks/{t}/reclaim",
json={},
)
assert r.status_code == 409
def test_reassign_endpoint_switches_profile(client):
"""POST /tasks/<id>/reassign changes the assignee field."""
conn = kb.connect()
try:
t = kb.create_task(conn, title="task", assignee="orig")
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/tasks/{t}/reassign",
json={"profile": "newbie", "reclaim_first": False},
)
assert r.status_code == 200, r.text
assert r.json()["assignee"] == "newbie"
conn2 = kb.connect()
try:
row = conn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (t,),
).fetchone()
assert row["assignee"] == "newbie"
finally:
conn2.close()
def test_reassign_endpoint_409_on_running_without_reclaim(client):
"""Reassigning a running task without reclaim_first returns 409."""
import secrets
conn = kb.connect()
try:
t = kb.create_task(conn, title="running", assignee="orig")
conn.execute(
"UPDATE tasks SET status='running', claim_lock=? WHERE id=?",
(secrets.token_hex(4), t),
)
conn.commit()
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/tasks/{t}/reassign",
json={"profile": "new", "reclaim_first": False},
)
assert r.status_code == 409
def test_reassign_endpoint_with_reclaim_first_succeeds_on_running(client):
"""With reclaim_first=true, a running task is reclaimed+reassigned in
one call."""
import secrets
conn = kb.connect()
try:
t = kb.create_task(conn, title="running", assignee="orig")
lock = secrets.token_hex(4)
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, claim_expires=?, "
"worker_pid=? WHERE id=?",
(lock, int(time.time()) + 3600, 1234, t),
)
conn.execute(
"INSERT INTO task_runs (task_id, status, claim_lock, claim_expires, "
"worker_pid, started_at) VALUES (?, 'running', ?, ?, ?, ?)",
(t, lock, int(time.time()) + 3600, 1234, int(time.time())),
)
rid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute("UPDATE tasks SET current_run_id=? WHERE id=?", (rid, t))
conn.commit()
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/tasks/{t}/reassign",
json={"profile": "new", "reclaim_first": True, "reason": "switch"},
)
assert r.status_code == 200, r.text
assert r.json()["assignee"] == "new"
conn2 = kb.connect()
try:
row = conn2.execute(
"SELECT status, assignee FROM tasks WHERE id=?", (t,),
).fetchone()
assert row["status"] == "ready"
assert row["assignee"] == "new"
finally:
conn2.close()
# ---------------------------------------------------------------------------
# Diagnostics endpoint (/api/plugins/kanban/diagnostics)
# ---------------------------------------------------------------------------
def test_diagnostics_endpoint_empty_for_clean_board(client):
r = client.get("/api/plugins/kanban/diagnostics")
assert r.status_code == 200
data = r.json()
assert data["count"] == 0
assert data["diagnostics"] == []
def test_diagnostics_endpoint_surfaces_blocked_hallucination(client):
conn = kb.connect()
try:
parent = kb.create_task(conn, title="parent", assignee="alice")
real = kb.create_task(conn, title="real", assignee="x", created_by="alice")
import pytest as _pytest
with _pytest.raises(kb.HallucinatedCardsError):
kb.complete_task(
conn, parent, summary="phantom",
created_cards=[real, "t_ffff00001234"],
)
finally:
conn.close()
r = client.get("/api/plugins/kanban/diagnostics")
assert r.status_code == 200
data = r.json()
assert data["count"] == 1
row = data["diagnostics"][0]
assert row["task_id"] == parent
assert row["diagnostics"][0]["kind"] == "hallucinated_cards"
assert row["diagnostics"][0]["severity"] == "error"
assert "t_ffff00001234" in row["diagnostics"][0]["data"]["phantom_ids"]
def test_diagnostics_endpoint_severity_filter(client):
"""Warning-severity filter excludes error-severity entries."""
conn = kb.connect()
try:
# A warning-severity diagnostic (prose phantom) on one task.
# Phantom id must be valid hex — the prose scanner regex
# requires ``t_[a-f0-9]{8,}``.
p1 = kb.create_task(conn, title="prose", assignee="a")
kb.complete_task(conn, p1, summary="mentioned t_deadbeef1234")
# An error-severity diagnostic (spawn failures) on another
p2 = kb.create_task(conn, title="spawn", assignee="b")
conn.execute(
"UPDATE tasks SET spawn_failures=5, last_spawn_error='x' WHERE id=?",
(p2,),
)
conn.commit()
finally:
conn.close()
r = client.get("/api/plugins/kanban/diagnostics?severity=warning")
assert r.status_code == 200
data = r.json()
assert data["count"] == 1
assert data["diagnostics"][0]["task_id"] == p1
r = client.get("/api/plugins/kanban/diagnostics?severity=error")
data = r.json()
assert data["count"] == 1
assert data["diagnostics"][0]["task_id"] == p2
def test_board_exposes_diagnostics_list_and_summary(client):
"""/board should attach both the full diagnostics list AND the
compact warnings summary (with highest_severity) on each task
that has any diagnostic.
"""
conn = kb.connect()
try:
t = kb.create_task(conn, title="crashy", assignee="worker")
# Simulate 2 consecutive crashes -> repeated_crashes error diag
for i in range(2):
conn.execute(
"INSERT INTO task_runs (task_id, status, outcome, started_at, "
"ended_at, error) VALUES (?, 'crashed', 'crashed', ?, ?, ?)",
(t, int(time.time()) - 100, int(time.time()) - 50, "OOM"),
)
conn.commit()
finally:
conn.close()
r = client.get("/api/plugins/kanban/board")
data = r.json()
tasks = [x for col in data["columns"] for x in col["tasks"]]
task_dict = next(x for x in tasks if x["title"] == "crashy")
assert task_dict["warnings"] is not None
assert task_dict["warnings"]["highest_severity"] == "error"
assert task_dict["diagnostics"][0]["kind"] == "repeated_crashes"