mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 02:11:48 +00:00
Salvage of PR #16100 onto current main (after emozilla's #17514 fix that unblocks plugin Pydantic body validation). History preserved on the standing `feat/kanban-standing` branch; this squashes the 22 iterative commits into one clean landing. What this lands: - SQLite kernel (hermes_cli/kanban_db.py) — durable task board with tasks, task_links, task_runs, task_comments, task_events, kanban_notify_subs tables. WAL mode, atomic claim via CAS, tenant-namespaced, skills JSON array per task, max-runtime timeouts, worker heartbeats, idempotency keys, circuit breaker on repeated spawn failures, crash detection via /proc/<pid>/status, run history preserved across attempts. - Dispatcher — runs inside the gateway by default (`kanban.dispatch_in_gateway: true`). Ticks every 60s, reclaims stale claims, promotes ready tasks, spawns `hermes -p <assignee> chat -q "work kanban task <id>"` with HERMES_KANBAN_TASK + HERMES_KANBAN_WORKSPACE env. Auto-loads `--skills kanban-worker` plus any per-task skills. Health telemetry warns on stuck ready queue. - Structured tool surface (tools/kanban_tools.py) — 7 tools (kanban_show, kanban_complete, kanban_block, kanban_heartbeat, kanban_comment, kanban_create, kanban_link). Gated on HERMES_KANBAN_TASK via check_fn so zero schema footprint in normal sessions. - System-prompt guidance (agent/prompt_builder.py KANBAN_GUIDANCE) injected only when kanban tools are active. - Dashboard plugin (plugins/kanban/dashboard/) — Linear-style board UI: triage/todo/ready/running/blocked/done columns, drag-drop, inline create, task drawer with markdown, comments, run history, dependency editor, bulk ops, lanes-by-profile grouping, WS-driven live refresh. Matches active dashboard theme via CSS variables. - CLI — `hermes kanban init|create|list|show|assign|link|unlink| claim|comment|complete|block|unblock|archive|tail|dispatch|context| init|gc|watch|stats|notify|log|heartbeat|runs|assignees` + `/kanban` slash in-session. - Worker + orchestrator skills (skills/devops/kanban-worker + kanban-orchestrator) — pattern library for good summary/metadata shapes, retry diagnostics, block-reason examples, fan-out patterns. - Per-task force-loaded skills — `--skill <name>` (repeatable), stored as JSON, threaded through to dispatcher argv as one `--skills X` pair per skill alongside the built-in kanban-worker. Dashboard + CLI + tool parity. - Deprecation of standalone `hermes kanban daemon` — stub exits 2 with migration guidance; `--force` escape hatch for headless hosts. - Docs (website/docs/user-guide/features/kanban.md + kanban-tutorial.md) with 11 dashboard screenshots walking through four user stories (Solo Dev, Fleet Farming, Role Pipeline, Circuit Breaker). - Tests (251 passing): kernel schema + migration + CAS atomicity, dispatcher logic, circuit breaker, crash detection, max-runtime timeouts, claim lifecycle, tenant isolation, idempotency keys, per- task skills round-trip + validation + dispatcher argv, tool surface (7 tools × round-trip + error paths), dashboard REST (CRUD + bulk + links + warnings), gateway-embedded dispatcher (config gate, env override, graceful shutdown), CLI deprecation stub, migration from legacy schemas. Gateway integration: - GatewayRunner._kanban_dispatcher_watcher — new asyncio background task, symmetric with _kanban_notifier_watcher. Runs dispatch_once via asyncio.to_thread so SQLite WAL never blocks the loop. Sleeps in 1s slices for snappy shutdown. Respects HERMES_KANBAN_DISPATCH_IN_GATEWAY=0 env override for debugging. - Config: new `kanban` section in DEFAULT_CONFIG with `dispatch_in_gateway: true` (default) + `dispatch_interval_seconds: 60`. Additive — no \_config_version bump needed. Forward-compat: - workflow_template_id / current_step_key columns on tasks (v1 writes NULL; v2 will use them for routing). - task_runs holds claim machinery (claim_lock, claim_expires, worker_pid, last_heartbeat_at) so multi-attempt history is first- class from day one. Closes #16102. Co-authored-by: emozilla <emozilla@nousresearch.com>
438 lines
16 KiB
Python
438 lines
16 KiB
Python
"""Tests for the Kanban DB layer (hermes_cli.kanban_db)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import concurrent.futures
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from hermes_cli import kanban_db as kb
|
|
|
|
|
|
@pytest.fixture
|
|
def kanban_home(tmp_path, monkeypatch):
|
|
"""Isolated HERMES_HOME with an empty kanban DB."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
kb.init_db()
|
|
return home
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema / init
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_init_db_is_idempotent(kanban_home):
|
|
# Second call should not error or drop data.
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="persisted")
|
|
kb.init_db()
|
|
with kb.connect() as conn:
|
|
tasks = kb.list_tasks(conn)
|
|
assert len(tasks) == 1
|
|
assert tasks[0].title == "persisted"
|
|
|
|
|
|
def test_init_creates_expected_tables(kanban_home):
|
|
with kb.connect() as conn:
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
|
|
).fetchall()
|
|
names = {r["name"] for r in rows}
|
|
assert {"tasks", "task_links", "task_comments", "task_events"} <= names
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task creation + status inference
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_create_task_no_parents_is_ready(kanban_home):
|
|
with kb.connect() as conn:
|
|
tid = kb.create_task(conn, title="ship it", assignee="alice")
|
|
t = kb.get_task(conn, tid)
|
|
assert t is not None
|
|
assert t.status == "ready"
|
|
assert t.assignee == "alice"
|
|
assert t.workspace_kind == "scratch"
|
|
|
|
|
|
def test_create_task_with_parent_is_todo_until_parent_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="parent")
|
|
c = kb.create_task(conn, title="child", parents=[p])
|
|
assert kb.get_task(conn, c).status == "todo"
|
|
kb.complete_task(conn, p, result="ok")
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
def test_create_task_unknown_parent_errors(kanban_home):
|
|
with kb.connect() as conn, pytest.raises(ValueError, match="unknown parent"):
|
|
kb.create_task(conn, title="orphan", parents=["t_ghost"])
|
|
|
|
|
|
def test_workspace_kind_validation(kanban_home):
|
|
with kb.connect() as conn, pytest.raises(ValueError, match="workspace_kind"):
|
|
kb.create_task(conn, title="bad ws", workspace_kind="cloud")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Links + dependency resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_link_demotes_ready_child_to_todo_when_parent_not_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b")
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.link_tasks(conn, a, b)
|
|
assert kb.get_task(conn, b).status == "todo"
|
|
|
|
|
|
def test_link_keeps_ready_child_when_parent_already_done(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
kb.complete_task(conn, a)
|
|
b = kb.create_task(conn, title="b")
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.link_tasks(conn, a, b)
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
|
|
|
|
def test_link_rejects_self_loop(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
with pytest.raises(ValueError, match="itself"):
|
|
kb.link_tasks(conn, a, a)
|
|
|
|
|
|
def test_link_detects_cycle(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b", parents=[a])
|
|
c = kb.create_task(conn, title="c", parents=[b])
|
|
with pytest.raises(ValueError, match="cycle"):
|
|
kb.link_tasks(conn, c, a)
|
|
with pytest.raises(ValueError, match="cycle"):
|
|
kb.link_tasks(conn, b, a)
|
|
|
|
|
|
def test_recompute_ready_cascades_through_chain(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b", parents=[a])
|
|
c = kb.create_task(conn, title="c", parents=[b])
|
|
assert [kb.get_task(conn, x).status for x in (a, b, c)] == \
|
|
["ready", "todo", "todo"]
|
|
kb.complete_task(conn, a)
|
|
assert kb.get_task(conn, b).status == "ready"
|
|
kb.complete_task(conn, b)
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
def test_recompute_ready_fan_in_waits_for_all_parents(kanban_home):
|
|
with kb.connect() as conn:
|
|
a = kb.create_task(conn, title="a")
|
|
b = kb.create_task(conn, title="b")
|
|
c = kb.create_task(conn, title="c", parents=[a, b])
|
|
kb.complete_task(conn, a)
|
|
assert kb.get_task(conn, c).status == "todo"
|
|
kb.complete_task(conn, b)
|
|
assert kb.get_task(conn, c).status == "ready"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Atomic claim (CAS)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_claim_once_wins_second_loses(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
first = kb.claim_task(conn, t, claimer="host:1")
|
|
assert first is not None and first.status == "running"
|
|
second = kb.claim_task(conn, t, claimer="host:2")
|
|
assert second is None
|
|
|
|
|
|
def test_claim_fails_on_non_ready(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
# Move to todo by introducing an unsatisfied parent.
|
|
p = kb.create_task(conn, title="p")
|
|
kb.link_tasks(conn, p, t)
|
|
assert kb.get_task(conn, t).status == "todo"
|
|
assert kb.claim_task(conn, t) is None
|
|
|
|
|
|
def test_stale_claim_reclaimed(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
# Rewind claim_expires so it looks stale.
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(int(time.time()) - 3600, t),
|
|
)
|
|
reclaimed = kb.release_stale_claims(conn)
|
|
assert reclaimed == 1
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_heartbeat_extends_claim(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
claimer = "host:hb"
|
|
kb.claim_task(conn, t, claimer=claimer, ttl_seconds=60)
|
|
original = kb.get_task(conn, t).claim_expires
|
|
# Rewind then heartbeat.
|
|
conn.execute("UPDATE tasks SET claim_expires = ? WHERE id = ?", (0, t))
|
|
ok = kb.heartbeat_claim(conn, t, claimer=claimer, ttl_seconds=3600)
|
|
assert ok
|
|
new = kb.get_task(conn, t).claim_expires
|
|
assert new > int(time.time()) + 3000
|
|
|
|
|
|
def test_concurrent_claims_only_one_wins(kanban_home):
|
|
"""Fire N threads claiming the same task; exactly one must win."""
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="race", assignee="a")
|
|
|
|
def attempt(i):
|
|
with kb.connect() as c:
|
|
return kb.claim_task(c, t, claimer=f"host:{i}")
|
|
|
|
n_workers = 8
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as ex:
|
|
results = list(ex.map(attempt, range(n_workers)))
|
|
winners = [r for r in results if r is not None]
|
|
assert len(winners) == 1
|
|
assert winners[0].status == "running"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Complete / block / unblock / archive / assign
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_complete_records_result(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
assert kb.complete_task(conn, t, result="done and dusted")
|
|
task = kb.get_task(conn, t)
|
|
assert task.status == "done"
|
|
assert task.result == "done and dusted"
|
|
assert task.completed_at is not None
|
|
|
|
|
|
def test_block_then_unblock(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
assert kb.block_task(conn, t, reason="need input")
|
|
assert kb.get_task(conn, t).status == "blocked"
|
|
assert kb.unblock_task(conn, t)
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
|
|
|
|
def test_assign_refuses_while_running(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
with pytest.raises(RuntimeError, match="currently running"):
|
|
kb.assign_task(conn, t, "b")
|
|
|
|
|
|
def test_assign_reassigns_when_not_running(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
assert kb.assign_task(conn, t, "b")
|
|
assert kb.get_task(conn, t).assignee == "b"
|
|
|
|
|
|
def test_archive_hides_from_default_list(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
kb.complete_task(conn, t)
|
|
assert kb.archive_task(conn, t)
|
|
assert len(kb.list_tasks(conn)) == 0
|
|
assert len(kb.list_tasks(conn, include_archived=True)) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Comments / events / worker context
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_comments_recorded_in_order(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
kb.add_comment(conn, t, "user", "first")
|
|
kb.add_comment(conn, t, "researcher", "second")
|
|
comments = kb.list_comments(conn, t)
|
|
assert [c.body for c in comments] == ["first", "second"]
|
|
assert [c.author for c in comments] == ["user", "researcher"]
|
|
|
|
|
|
def test_empty_comment_rejected(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
with pytest.raises(ValueError, match="body is required"):
|
|
kb.add_comment(conn, t, "user", "")
|
|
|
|
|
|
def test_events_capture_lifecycle(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="a")
|
|
kb.claim_task(conn, t)
|
|
kb.complete_task(conn, t, result="ok")
|
|
events = kb.list_events(conn, t)
|
|
kinds = [e.kind for e in events]
|
|
assert "created" in kinds
|
|
assert "claimed" in kinds
|
|
assert "completed" in kinds
|
|
|
|
|
|
def test_worker_context_includes_parent_results_and_comments(kanban_home):
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="p")
|
|
kb.complete_task(conn, p, result="PARENT_RESULT_MARKER")
|
|
c = kb.create_task(conn, title="child", parents=[p])
|
|
kb.add_comment(conn, c, "user", "CLARIFICATION_MARKER")
|
|
ctx = kb.build_worker_context(conn, c)
|
|
assert "PARENT_RESULT_MARKER" in ctx
|
|
assert "CLARIFICATION_MARKER" in ctx
|
|
assert c in ctx
|
|
assert "child" in ctx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dispatcher
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_dispatch_dry_run_does_not_claim(kanban_home):
|
|
with kb.connect() as conn:
|
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
|
t2 = kb.create_task(conn, title="b", assignee="bob")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert {s[0] for s in res.spawned} == {t1, t2}
|
|
with kb.connect() as conn:
|
|
# Dry run must NOT mutate status.
|
|
assert kb.get_task(conn, t1).status == "ready"
|
|
assert kb.get_task(conn, t2).status == "ready"
|
|
|
|
|
|
def test_dispatch_skips_unassigned(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="floater")
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert t in res.skipped_unassigned
|
|
assert not res.spawned
|
|
|
|
|
|
def test_dispatch_promotes_ready_and_spawns(kanban_home):
|
|
spawns = []
|
|
|
|
def fake_spawn(task, workspace):
|
|
spawns.append((task.id, task.assignee, workspace))
|
|
|
|
with kb.connect() as conn:
|
|
p = kb.create_task(conn, title="p", assignee="alice")
|
|
c = kb.create_task(conn, title="c", assignee="bob", parents=[p])
|
|
# Finish parent outside dispatch; promotion happens inside.
|
|
kb.complete_task(conn, p)
|
|
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
|
# Spawned c (a was already done when dispatch was called).
|
|
assert len(spawns) == 1
|
|
assert spawns[0][0] == c
|
|
assert spawns[0][1] == "bob"
|
|
# c is now running
|
|
with kb.connect() as conn:
|
|
assert kb.get_task(conn, c).status == "running"
|
|
|
|
|
|
def test_dispatch_spawn_failure_releases_claim(kanban_home):
|
|
def boom(task, workspace):
|
|
raise RuntimeError("spawn failed")
|
|
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="boom", assignee="alice")
|
|
kb.dispatch_once(conn, spawn_fn=boom)
|
|
# Must return to ready so the next tick can retry.
|
|
assert kb.get_task(conn, t).status == "ready"
|
|
assert kb.get_task(conn, t).claim_lock is None
|
|
|
|
|
|
def test_dispatch_reclaims_stale_before_spawning(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x", assignee="alice")
|
|
kb.claim_task(conn, t)
|
|
conn.execute(
|
|
"UPDATE tasks SET claim_expires = ? WHERE id = ?",
|
|
(int(time.time()) - 1, t),
|
|
)
|
|
res = kb.dispatch_once(conn, dry_run=True)
|
|
assert res.reclaimed == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Workspace resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_scratch_workspace_created_under_hermes_home(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="x")
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
assert ws.exists()
|
|
assert ws.is_dir()
|
|
assert "kanban" in str(ws)
|
|
|
|
|
|
def test_dir_workspace_honors_given_path(kanban_home, tmp_path):
|
|
target = tmp_path / "my-vault"
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(
|
|
conn, title="biz", workspace_kind="dir", workspace_path=str(target)
|
|
)
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
assert ws == target
|
|
assert ws.exists()
|
|
|
|
|
|
def test_worktree_workspace_returns_intended_path(kanban_home, tmp_path):
|
|
target = str(tmp_path / ".worktrees" / "my-task")
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(
|
|
conn, title="ship", workspace_kind="worktree", workspace_path=target
|
|
)
|
|
task = kb.get_task(conn, t)
|
|
ws = kb.resolve_workspace(task)
|
|
# We do NOT auto-create worktrees; the worker's skill handles that.
|
|
assert str(ws) == target
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tenancy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_tenant_column_filters_listings(kanban_home):
|
|
with kb.connect() as conn:
|
|
kb.create_task(conn, title="a1", tenant="biz-a")
|
|
kb.create_task(conn, title="b1", tenant="biz-b")
|
|
kb.create_task(conn, title="shared") # no tenant
|
|
biz_a = kb.list_tasks(conn, tenant="biz-a")
|
|
biz_b = kb.list_tasks(conn, tenant="biz-b")
|
|
assert [t.title for t in biz_a] == ["a1"]
|
|
assert [t.title for t in biz_b] == ["b1"]
|
|
|
|
|
|
def test_tenant_propagates_to_events(kanban_home):
|
|
with kb.connect() as conn:
|
|
t = kb.create_task(conn, title="tenant-task", tenant="biz-a")
|
|
events = kb.list_events(conn, t)
|
|
# The "created" event should have tenant in its payload.
|
|
created = [e for e in events if e.kind == "created"]
|
|
assert created and created[0].payload.get("tenant") == "biz-a"
|