feat(kanban): hallucination gate + recovery UX for worker-created-card claims (#20232)

Workers completing a kanban task can now claim the ids of cards they
created via an optional ``created_cards`` field on ``kanban_complete``.
The kernel verifies each id exists and was created by the completing
worker's profile; any phantom id blocks the completion with a
``HallucinatedCardsError`` and records a
``completion_blocked_hallucination`` event on the task so the rejected
attempt is auditable. Successful completions also get a non-blocking
prose-scan pass over their ``summary`` + ``result`` that emits a
``suspected_hallucinated_references`` event for any ``t_<hex>``
reference that doesn't resolve.

Closes #20017.

Recovery UX (kernel + CLI + dashboard)
--------------------------------------

A structural gate alone isn't enough — operators also need to see and
act on stuck workers, especially when a profile's model is the root
cause. This PR ships the full loop:

* ``kanban_db.reclaim_task(task_id)`` — operator-driven reclaim that
  releases an active worker claim immediately (unlike
  ``release_stale_claims`` which only acts after claim_expires has
  passed). Emits a ``reclaimed`` event with ``manual: True`` payload.
* ``kanban_db.reassign_task(task_id, profile, reclaim_first=...)`` —
  switch a task to a different profile, optionally reclaiming a stuck
  running worker in the same call.
* ``hermes kanban reclaim <id> [--reason ...]`` and
  ``hermes kanban reassign <id> <profile> [--reclaim] [--reason ...]``
  CLI subcommands wired through to the same helpers.
* ``POST /api/plugins/kanban/tasks/{id}/reclaim`` and
  ``POST /api/plugins/kanban/tasks/{id}/reassign`` endpoints on the
  dashboard plugin.

Dashboard surfacing
-------------------

* ⚠ **warning badge** on cards with active hallucination events.
* **attention strip** at the top of the board listing all flagged
  tasks; dismissible per session.
* **events callout** in the task drawer — hallucination events render
  with a red left border, amber icon, and phantom ids as styled chips.
* **recovery section** in the task drawer with three actions: Reclaim,
  Reassign (with profile picker + reclaim-first checkbox), and a
  copy-to-clipboard hint for ``hermes -p <profile> model`` since
  profile config lives on disk and can't be edited from the browser.
  Auto-opens when the task has warnings, collapsed otherwise.
  Keyed by task id so state doesn't leak between drawers.

Active-vs-stale rule: warnings clear when a clean ``completed`` or
``edited`` event supersedes the hallucination, so recovery is never
permanently stigmatising — the audit events persist for debugging but
the badge goes away once the worker succeeds.

Skill updates
-------------

* ``skills/devops/kanban-worker/SKILL.md`` documents the
  ``created_cards`` contract with good/bad examples.
* ``skills/devops/kanban-orchestrator/SKILL.md`` gains a "Recovering
  stuck workers" section with the three actions and when to use each.

Tests
-----

* Kernel gate: verified-cards manifest, phantom rejection + audit
  event, cross-worker rejection, prose scan positive + negative.
* Recovery helpers: reclaim on running task, reclaim on non-running
  returns False, reassign refuses running without reclaim_first,
  reassign with reclaim_first succeeds on running.
* API endpoints: warnings field present on /board and /tasks/:id,
  warnings cleared after clean completion, reclaim 200 + 409 paths,
  reassign 200 + 409 + reclaim_first paths.
* CLI smoke: reclaim + reassign subcommands.

Live-verified end-to-end on a dashboard with seeded scenarios:
attention strip renders, badges land on the right cards, drawer
callout shows phantom chips, Reclaim on a running task flips status to
ready + emits manual reclaimed event + refreshes the drawer,
Reassign swaps the assignee and triggers board refresh.

359/359 kanban-suite tests pass
(test_kanban_{db,cli,boards,core_functionality} + dashboard + tools).
This commit is contained in:
Teknium 2026-05-05 08:06:55 -07:00 committed by GitHub
parent 7de3c86c5a
commit de9238d37e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1791 additions and 17 deletions

View file

@ -308,6 +308,35 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
p_assign.add_argument("task_id")
p_assign.add_argument("profile", help="Profile name (or 'none' to unassign)")
# --- reclaim / reassign (recovery) ---
p_reclaim = sub.add_parser(
"reclaim",
help="Release an active worker claim on a running task",
)
p_reclaim.add_argument("task_id")
p_reclaim.add_argument(
"--reason", default=None,
help="Human-readable reason (recorded on the reclaimed event)",
)
p_reassign = sub.add_parser(
"reassign",
help="Reassign a task to a different profile, optionally reclaiming first",
)
p_reassign.add_argument("task_id")
p_reassign.add_argument(
"profile",
help="New profile name (or 'none' to unassign)",
)
p_reassign.add_argument(
"--reclaim", action="store_true",
help="Release any active claim before reassigning (required if task is running)",
)
p_reassign.add_argument(
"--reason", default=None,
help="Human-readable reason (recorded on the reclaimed event)",
)
# --- link / unlink ---
p_link = sub.add_parser("link", help="Add a parent->child dependency")
p_link.add_argument("parent_id")
@ -597,6 +626,8 @@ def kanban_command(args: argparse.Namespace) -> int:
"ls": _cmd_list,
"show": _cmd_show,
"assign": _cmd_assign,
"reclaim": _cmd_reclaim,
"reassign": _cmd_reassign,
"link": _cmd_link,
"unlink": _cmd_unlink,
"claim": _cmd_claim,
@ -1117,6 +1148,45 @@ def _cmd_assign(args: argparse.Namespace) -> int:
return 0
def _cmd_reclaim(args: argparse.Namespace) -> int:
with kb.connect() as conn:
ok = kb.reclaim_task(
conn, args.task_id,
reason=getattr(args, "reason", None),
)
if not ok:
print(
f"cannot reclaim {args.task_id} (not running or unknown id)",
file=sys.stderr,
)
return 1
print(f"Reclaimed {args.task_id}")
return 0
def _cmd_reassign(args: argparse.Namespace) -> int:
profile = None if args.profile.lower() in ("none", "-", "null") else args.profile
with kb.connect() as conn:
ok = kb.reassign_task(
conn, args.task_id, profile,
reclaim_first=bool(getattr(args, "reclaim", False)),
reason=getattr(args, "reason", None),
)
if not ok:
print(
f"cannot reassign {args.task_id} "
f"(unknown id, or still running — pass --reclaim to release first)",
file=sys.stderr,
)
return 1
print(
f"Reassigned {args.task_id} to "
f"{profile or '(unassigned)'}"
+ (" (claim reclaimed)" if getattr(args, "reclaim", False) else "")
)
return 0
def _cmd_link(args: argparse.Namespace) -> int:
with kb.connect() as conn:
kb.link_tasks(conn, args.parent_id, args.child_id)

View file

@ -1842,6 +1842,212 @@ def release_stale_claims(conn: sqlite3.Connection) -> int:
return reclaimed
def reclaim_task(
conn: sqlite3.Connection,
task_id: str,
*,
reason: Optional[str] = None,
) -> bool:
"""Operator-driven reclaim: release the claim and reset to ``ready``.
Unlike :func:`release_stale_claims` which only acts on tasks whose
``claim_expires`` has passed, this function reclaims immediately
regardless of TTL. Intended for the dashboard/CLI recovery flow
when an operator wants to abort a running worker without waiting
for the TTL to expire (e.g. after seeing a hallucination warning).
Returns True if a reclaim happened, False if the task isn't in a
reclaimable state (not running, or doesn't exist).
"""
with write_txn(conn):
row = conn.execute(
"SELECT status, claim_lock, worker_pid FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
if not row:
return False
if row["status"] != "running" and row["claim_lock"] is None:
# Nothing to reclaim — already ready / blocked / done.
return False
prev_lock = row["claim_lock"]
prev_pid = row["worker_pid"]
conn.execute(
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
"claim_expires = NULL, worker_pid = NULL "
"WHERE id = ? AND status IN ('running', 'ready', 'blocked')",
(task_id,),
)
run_id = _end_run(
conn, task_id,
outcome="reclaimed", status="reclaimed",
error=(
f"manual_reclaim: {reason}" if reason
else f"manual_reclaim lock={prev_lock}"
),
)
_append_event(
conn, task_id, "reclaimed",
{
"manual": True,
"reason": reason,
"prev_lock": prev_lock,
"prev_pid": prev_pid,
},
run_id=run_id,
)
return True
def reassign_task(
conn: sqlite3.Connection,
task_id: str,
profile: Optional[str],
*,
reclaim_first: bool = False,
reason: Optional[str] = None,
) -> bool:
"""Reassign a task, optionally reclaiming a stuck running worker first.
This is the recovery path for "this profile's model is broken, try
a different one". If ``reclaim_first`` is True, any active claim is
released (via :func:`reclaim_task`) before the reassign happens;
otherwise the function refuses to reassign a currently-running task
and returns False (caller can retry with ``reclaim_first=True``).
Returns True if the reassign landed. ``profile`` may be ``None`` to
unassign entirely.
"""
if reclaim_first:
# Safe to call even if nothing to reclaim.
reclaim_task(conn, task_id, reason=reason or "reassign")
# assign_task handles its own txn + the still-running guard.
try:
return assign_task(conn, task_id, profile)
except RuntimeError:
# Task is still running and reclaim_first was False; caller
# needs to decide whether to retry with reclaim.
return False
def _verify_created_cards(
conn: sqlite3.Connection,
completing_task_id: str,
claimed_ids: Iterable[str],
) -> tuple[list[str], list[str]]:
"""Partition ``claimed_ids`` into (verified, phantom).
A card is "verified" iff a row exists in ``tasks`` with the given id
AND ``created_by`` matches the completing task's ``assignee`` (or
the completing task itself workers that create children of their
own task also qualify).
``phantom`` returns ids that either don't exist at all or exist but
were not created by the completing worker. The caller decides what
to do with each bucket; this helper never mutates.
"""
claimed = [str(x).strip() for x in (claimed_ids or []) if str(x).strip()]
if not claimed:
return [], []
# Dedupe while preserving order.
seen: set[str] = set()
ordered: list[str] = []
for cid in claimed:
if cid not in seen:
seen.add(cid)
ordered.append(cid)
row = conn.execute(
"SELECT assignee FROM tasks WHERE id = ?", (completing_task_id,),
).fetchone()
if row is None:
# Completing task not found — nothing resolves.
return [], ordered
completing_assignee = row["assignee"]
# Batch-fetch existence + created_by in one query.
placeholders = ",".join(["?"] * len(ordered))
rows = conn.execute(
f"SELECT id, created_by FROM tasks WHERE id IN ({placeholders})",
tuple(ordered),
).fetchall()
found = {r["id"]: r["created_by"] for r in rows}
verified: list[str] = []
phantom: list[str] = []
for cid in ordered:
created_by = found.get(cid)
if created_by is None:
phantom.append(cid)
continue
# Accept if created_by matches the completing task's assignee
# profile, OR the task itself (workers whose created_by happens
# to match their task id are unusual but harmless to accept).
if completing_assignee and created_by == completing_assignee:
verified.append(cid)
elif created_by == completing_task_id:
verified.append(cid)
else:
phantom.append(cid)
return verified, phantom
# Task-id pattern used both by ``kanban_create`` (``t_<12 hex>``) and
# ``_new_task_id`` below. Kept permissive on length for forward compat:
# accept 8+ hex chars after the ``t_`` prefix.
_TASK_ID_PROSE_RE = re.compile(r"\bt_[a-f0-9]{8,}\b")
def _scan_prose_for_phantom_ids(
conn: sqlite3.Connection,
text: str,
) -> list[str]:
"""Regex-scan free-form text for ``t_<hex>`` references; return the
ones that don't exist in ``tasks``.
Used as a non-blocking advisory check on completion summaries. An
empty return means "no suspicious references found" either the
text had no IDs at all, or every ID it mentioned resolves to a real
task. Duplicates are deduped.
"""
if not text:
return []
matches = _TASK_ID_PROSE_RE.findall(text)
if not matches:
return []
# Dedupe preserving order.
seen: set[str] = set()
unique: list[str] = []
for m in matches:
if m not in seen:
seen.add(m)
unique.append(m)
placeholders = ",".join(["?"] * len(unique))
rows = conn.execute(
f"SELECT id FROM tasks WHERE id IN ({placeholders})",
tuple(unique),
).fetchall()
existing = {r["id"] for r in rows}
return [m for m in unique if m not in existing]
class HallucinatedCardsError(ValueError):
"""Raised by ``complete_task`` when ``created_cards`` contains ids
that don't exist or weren't created by the completing worker.
The phantom list is attached as ``.phantom`` for callers that want
structured access. Kept as ``ValueError`` subclass so existing
tool-error handlers treat it as a recoverable user error.
"""
def __init__(self, phantom: list[str], completing_task_id: str):
self.phantom = list(phantom)
self.completing_task_id = completing_task_id
super().__init__(
f"completion blocked: claimed created_cards that do not exist "
f"or were not created by this worker: {', '.join(phantom)}"
)
def complete_task(
conn: sqlite3.Connection,
task_id: str,
@ -1849,21 +2055,65 @@ def complete_task(
result: Optional[str] = None,
summary: Optional[str] = None,
metadata: Optional[dict] = None,
created_cards: Optional[Iterable[str]] = None,
) -> bool:
"""Transition ``running|ready -> done`` and record ``result``.
Accepts a task that's merely ``ready`` too, so a manual CLI
Accepts a task that is merely ``ready`` too, so a manual CLI
completion (``hermes kanban complete <id>``) works without requiring
a claim/start/complete sequence.
``summary`` and ``metadata`` are stored on the closing run (if any)
and surfaced to downstream children via :func:`build_worker_context`.
When ``summary`` is omitted we fall back to ``result`` so single-run
callers don't have to pass both. ``metadata`` is a free-form dict
callers do not have to pass both. ``metadata`` is a free-form dict
(e.g. ``{"changed_files": [...], "tests_run": [...]}``) workers
are encouraged to use it for structured handoff facts.
``created_cards`` is an optional list of task ids the completing
worker claims to have created. Each id is verified against
``tasks.created_by``. If any id is phantom (does not exist or was
not created by this worker's assignee profile), completion is blocked
with a ``HallucinatedCardsError`` and a
``completion_blocked_hallucination`` event is emitted so the rejected
attempt is auditable. When all ids verify, they are recorded on the
``completed`` event payload.
After a successful completion, ``summary`` and ``result`` are scanned
for prose references like ``t_deadbeefcafe`` that do not resolve.
Any suspected phantom references are recorded as a
``suspected_hallucinated_references`` event. This pass is advisory
and never blocks.
"""
now = int(time.time())
# Gate: verify created_cards BEFORE the main write txn. A rejected
# completion still needs an auditable event, so we emit it in a
# tiny dedicated txn, then raise. The caller is responsible for
# surfacing HallucinatedCardsError to the worker; this function
# never mutates task state on a phantom-card rejection.
if created_cards:
verified_cards, phantom_cards = _verify_created_cards(
conn, task_id, created_cards
)
if phantom_cards:
with write_txn(conn):
_append_event(
conn, task_id, "completion_blocked_hallucination",
{
"phantom_cards": phantom_cards,
"verified_cards": verified_cards,
"summary_preview": (
(summary or result or "").strip().splitlines()[0][:200]
if (summary or result)
else None
),
},
)
raise HallucinatedCardsError(phantom_cards, task_id)
else:
verified_cards = []
with write_txn(conn):
cur = conn.execute(
"""
@ -1904,14 +2154,38 @@ def complete_task(
# full summary stays on the run row.
ev_summary = (summary if summary is not None else result) or ""
ev_summary = ev_summary.strip().splitlines()[0][:400] if ev_summary else ""
completed_payload: dict = {
"result_len": len(result) if result else 0,
"summary": ev_summary or None,
}
if verified_cards:
completed_payload["verified_cards"] = verified_cards
_append_event(
conn, task_id, "completed",
{
"result_len": len(result) if result else 0,
"summary": ev_summary or None,
},
completed_payload,
run_id=run_id,
)
# Prose-scan the summary + result for t_<hex> references that do
# not resolve. Advisory — does not block the completion. Runs in
# its own txn so the completion itself is already durable by the
# time we emit the warning.
scan_text = " ".join(filter(None, [summary, result]))
if scan_text:
phantom_refs = _scan_prose_for_phantom_ids(conn, scan_text)
# Drop any phantom refs that were already flagged as verified
# above (shouldn't happen — verified means they exist — but
# belt-and-suspenders).
phantom_refs = [p for p in phantom_refs if p not in set(verified_cards)]
if phantom_refs:
with write_txn(conn):
_append_event(
conn, task_id, "suspected_hallucinated_references",
{
"phantom_refs": phantom_refs,
"source": "completion_summary",
},
run_id=run_id,
)
# Recompute ready status for dependents (separate txn so children see done).
recompute_ready(conn)
return True