feat(kanban): worker visibility endpoints (workers/active, runs/{id}, inspect)

Adds three read-only endpoints to the kanban dashboard plugin so the
SwitchUI workspace (and any other dashboard consumer) can track
workers across tasks without N+1 round-trips through /tasks/{task_id}.

- GET /workers/active
  Single SQL JOIN of task_runs + tasks where ended_at IS NULL,
  worker_pid IS NOT NULL, status='running'. Returns
  {workers: [...], count, checked_at}.

- GET /runs/{run_id}
  Direct lookup of any task_run row by id. Reuses existing
  kanban_db.get_run() helper and _run_dict() serialiser. 404 when
  not found. Mirrors GET /tasks/{task_id} 404 shape.

- GET /runs/{run_id}/inspect
  Live PID stats via psutil.Process.as_dict() — cpu_percent,
  memory_rss_bytes, memory_vms_bytes, num_threads, num_fds, status,
  create_time, cmdline. Short-circuits with alive:false when run
  has ended, has no worker_pid, the pid is gone, or psutil is
  unavailable. AccessDenied surfaces as alive:true with error
  rather than a 500.

11 new tests in tests/plugins/test_kanban_worker_runs.py cover the
empty-board case, running-task case, ended-run filtering,
missing-pid filtering, 404 paths, already-ended inspect, no-pid
inspect, dead-pid inspect, and live-pid inspect (psutil mocked).
All pass.

Companion termination endpoint (POST /runs/{run_id}/terminate) is
intentionally out of scope here — opening a separate issue first
since the RBAC and dispatcher-mediated soft-cancel design needs
maintainer input before code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Interstellar-code 2026-05-18 21:01:42 -07:00 committed by Teknium
parent b65dfbb453
commit 02efad704f
2 changed files with 463 additions and 0 deletions

View file

@ -1050,6 +1050,168 @@ def list_diagnostics(
conn.close()
# ---------------------------------------------------------------------------
# Worker visibility — cross-task active-worker list and per-run inspection
# ---------------------------------------------------------------------------
try:
import psutil as _psutil
except ImportError:
_psutil = None # type: ignore[assignment]
@router.get("/workers/active")
def list_active_workers(
board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"),
):
"""Return every currently-running worker on the board.
A worker is a ``task_runs`` row whose ``ended_at`` is NULL and whose
``worker_pid`` is non-NULL, belonging to a task with ``status='running'``.
Returns ``{workers: [...], count: N, checked_at: <epoch>}``. Each
worker entry carries enough context for the dashboard to link back to
its task without a second round-trip.
"""
board = _resolve_board(board)
conn = _conn(board=board)
try:
rows = conn.execute(
"""
SELECT
r.id AS run_id,
r.task_id,
t.title AS task_title,
t.status AS task_status,
t.assignee AS task_assignee,
r.profile,
r.worker_pid,
r.started_at,
r.claim_lock,
r.claim_expires,
r.last_heartbeat_at,
r.max_runtime_seconds
FROM task_runs r
JOIN tasks t ON t.id = r.task_id
WHERE r.ended_at IS NULL
AND r.worker_pid IS NOT NULL
AND t.status = 'running'
ORDER BY r.started_at ASC
""",
).fetchall()
workers = [
{
"run_id": row["run_id"],
"task_id": row["task_id"],
"task_title": row["task_title"],
"task_status": row["task_status"],
"task_assignee": row["task_assignee"],
"profile": row["profile"],
"worker_pid": row["worker_pid"],
"started_at": row["started_at"],
"claim_lock": row["claim_lock"],
"claim_expires": row["claim_expires"],
"last_heartbeat_at": row["last_heartbeat_at"],
"max_runtime_seconds": row["max_runtime_seconds"],
}
for row in rows
]
return {"workers": workers, "count": len(workers), "checked_at": int(time.time())}
finally:
conn.close()
@router.get("/runs/{run_id}")
def get_run_endpoint(
run_id: int,
board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"),
):
"""Direct lookup of a ``task_runs`` row by its integer id.
Returns ``{run: {...}}`` using the same serialisation as the
per-task run history embedded in ``GET /tasks/{task_id}``.
404 when no such run exists.
"""
board = _resolve_board(board)
conn = _conn(board=board)
try:
r = kanban_db.get_run(conn, run_id)
if r is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
return {"run": _run_dict(r)}
finally:
conn.close()
@router.get("/runs/{run_id}/inspect")
def inspect_run_endpoint(
run_id: int,
board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"),
):
"""Live PID stats for a run's worker process via psutil.
If the run has already ended, or has no recorded ``worker_pid``,
returns ``{alive: false}`` with a human-readable ``reason``.
When the process is live, returns CPU, memory, thread count, fd count,
status, create_time, and cmdline. ``access_denied`` is set when the
OS refuses inspection rather than raising a 500.
psutil availability: if psutil is not installed the endpoint still
works but ``alive`` is always returned as ``false`` with
``reason="psutil not available"``.
"""
board = _resolve_board(board)
conn = _conn(board=board)
try:
r = kanban_db.get_run(conn, run_id)
if r is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
finally:
conn.close()
if r.ended_at is not None:
return {"run_id": run_id, "alive": False, "reason": "run already ended"}
if r.worker_pid is None:
return {"run_id": run_id, "alive": False, "reason": "no worker_pid recorded"}
pid = r.worker_pid
if _psutil is None:
return {"run_id": run_id, "alive": False, "pid": pid, "reason": "psutil not available"}
try:
proc = _psutil.Process(pid)
info = proc.as_dict(attrs=[
"cpu_percent", "memory_info", "num_threads",
"status", "create_time", "cmdline",
])
# num_fds is POSIX-only; skip gracefully on Windows.
try:
num_fds = proc.num_fds()
except AttributeError:
num_fds = None
mem = info.get("memory_info")
return {
"run_id": run_id,
"alive": True,
"pid": pid,
"cpu_percent": info.get("cpu_percent"),
"memory_rss_bytes": mem.rss if mem else None,
"memory_vms_bytes": mem.vms if mem else None,
"num_threads": info.get("num_threads"),
"num_fds": num_fds,
"status": info.get("status"),
"create_time": info.get("create_time"),
"cmdline": info.get("cmdline"),
}
except _psutil.NoSuchProcess:
return {"run_id": run_id, "alive": False, "pid": pid, "reason": "process not found"}
except _psutil.AccessDenied:
return {"run_id": run_id, "alive": True, "pid": pid, "error": "access denied"}
# ---------------------------------------------------------------------------
# Recovery actions — reclaim a running claim, reassign to a new profile
# ---------------------------------------------------------------------------

View file

@ -0,0 +1,301 @@
"""Tests for kanban worker/runs read endpoints.
Covers:
GET /workers/active
GET /runs/{run_id}
GET /runs/{run_id}/inspect
"""
from __future__ import annotations
import importlib.util
import secrets
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from hermes_cli import kanban_db as kb
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _load_plugin_router():
"""Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router."""
repo_root = Path(__file__).resolve().parents[2]
plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py"
assert plugin_file.exists(), f"plugin file missing: {plugin_file}"
mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test"
# Re-use a cached module if already loaded to avoid duplicate-router issues.
if mod_name in sys.modules:
return sys.modules[mod_name].router
spec = importlib.util.spec_from_file_location(mod_name, plugin_file)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
sys.modules[mod_name] = mod
spec.loader.exec_module(mod)
return mod.router
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.fixture
def client(kanban_home):
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
return TestClient(app)
def _insert_run(conn, task_id, *, worker_pid=None, ended_at=None):
"""Insert a task_runs row directly (bypassing claim machinery) and return run_id."""
lock = secrets.token_hex(8)
future = int(time.time()) + 3600
cur = conn.execute(
"INSERT INTO task_runs "
"(task_id, status, claim_lock, claim_expires, worker_pid, started_at, ended_at) "
"VALUES (?, 'running', ?, ?, ?, ?, ?)",
(task_id, lock, future, worker_pid, int(time.time()), ended_at),
)
conn.commit()
return cur.lastrowid
# ---------------------------------------------------------------------------
# GET /workers/active
# ---------------------------------------------------------------------------
def test_workers_active_empty_board(client):
"""Board with no running tasks returns an empty workers list."""
r = client.get("/api/plugins/kanban/workers/active")
assert r.status_code == 200
body = r.json()
assert body["workers"] == []
assert body["count"] == 0
assert "checked_at" in body
def test_workers_active_with_running_task(client):
"""A running task with an open run row and worker_pid appears in the list."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="active-worker", assignee="alice")
conn.execute(
"UPDATE tasks SET status='running' WHERE id=?", (task_id,),
)
_insert_run(conn, task_id, worker_pid=12345)
finally:
conn.close()
r = client.get("/api/plugins/kanban/workers/active")
assert r.status_code == 200
body = r.json()
assert body["count"] == 1
w = body["workers"][0]
assert w["task_id"] == task_id
assert w["worker_pid"] == 12345
assert w["task_status"] == "running"
assert w["task_title"] == "active-worker"
assert w["task_assignee"] == "alice"
def test_workers_active_excludes_ended_runs(client):
"""Runs with ended_at set are excluded even if task is running."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="ended-run", assignee="bob")
conn.execute("UPDATE tasks SET status='running' WHERE id=?", (task_id,))
_insert_run(conn, task_id, worker_pid=99999, ended_at=int(time.time()) - 60)
finally:
conn.close()
r = client.get("/api/plugins/kanban/workers/active")
assert r.status_code == 200
assert r.json()["count"] == 0
def test_workers_active_excludes_runs_without_pid(client):
"""Runs with no worker_pid are not considered active workers."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="no-pid", assignee="carol")
conn.execute("UPDATE tasks SET status='running' WHERE id=?", (task_id,))
_insert_run(conn, task_id, worker_pid=None)
finally:
conn.close()
r = client.get("/api/plugins/kanban/workers/active")
assert r.status_code == 200
assert r.json()["count"] == 0
# ---------------------------------------------------------------------------
# GET /runs/{run_id}
# ---------------------------------------------------------------------------
def test_get_run_404_unknown_id(client):
"""Non-existent run_id returns 404."""
r = client.get("/api/plugins/kanban/runs/999999")
assert r.status_code == 404
assert "999999" in r.json()["detail"]
def test_get_run_ok(client):
"""Existing run row returns 200 with expected shape."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="run-lookup", assignee="dave")
run_id = _insert_run(conn, task_id, worker_pid=55555)
finally:
conn.close()
r = client.get(f"/api/plugins/kanban/runs/{run_id}")
assert r.status_code == 200
body = r.json()
assert "run" in body
run = body["run"]
assert run["id"] == run_id
assert run["task_id"] == task_id
assert run["worker_pid"] == 55555
assert run["ended_at"] is None
# ---------------------------------------------------------------------------
# GET /runs/{run_id}/inspect
# ---------------------------------------------------------------------------
def test_inspect_run_404(client):
"""Non-existent run_id returns 404."""
r = client.get("/api/plugins/kanban/runs/888888/inspect")
assert r.status_code == 404
def test_inspect_run_already_ended(client):
"""Run with ended_at set returns alive=false with reason."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="ended", assignee="eve")
run_id = _insert_run(conn, task_id, worker_pid=11111, ended_at=int(time.time()) - 10)
finally:
conn.close()
r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect")
assert r.status_code == 200
body = r.json()
assert body["alive"] is False
assert "ended" in body["reason"]
def test_inspect_run_no_pid(client):
"""Run with no worker_pid returns alive=false with reason."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="no-pid-inspect", assignee="frank")
run_id = _insert_run(conn, task_id, worker_pid=None)
finally:
conn.close()
r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect")
assert r.status_code == 200
body = r.json()
assert body["alive"] is False
assert "worker_pid" in body["reason"]
def test_inspect_run_dead_pid(client, monkeypatch):
"""Run with a non-existent PID returns alive=false via psutil.NoSuchProcess."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="dead-pid", assignee="grace")
run_id = _insert_run(conn, task_id, worker_pid=999999)
finally:
conn.close()
# Mock psutil to raise NoSuchProcess for any PID.
mock_psutil = MagicMock()
mock_psutil.NoSuchProcess = Exception
mock_psutil.AccessDenied = PermissionError
def _raise_no_such(*args, **kwargs):
raise mock_psutil.NoSuchProcess("no such process")
mock_psutil.Process = _raise_no_such
# Patch the module-level _psutil in the loaded plugin module.
plugin_mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test"
plugin_mod = sys.modules.get(plugin_mod_name)
if plugin_mod is not None:
monkeypatch.setattr(plugin_mod, "_psutil", mock_psutil)
else:
pytest.skip("plugin module not yet loaded")
r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect")
assert r.status_code == 200
body = r.json()
assert body["alive"] is False
assert body["pid"] == 999999
assert "not found" in body["reason"]
def test_inspect_run_live_pid(client, monkeypatch):
"""Run with a live PID returns alive=true with psutil fields."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="live-pid", assignee="heidi")
run_id = _insert_run(conn, task_id, worker_pid=12345)
finally:
conn.close()
# Build a realistic mock psutil.
mock_psutil = MagicMock()
mock_psutil.NoSuchProcess = type("NoSuchProcess", (Exception,), {})
mock_psutil.AccessDenied = type("AccessDenied", (Exception,), {})
fake_mem = MagicMock()
fake_mem.rss = 1024 * 1024 * 50 # 50 MB
fake_mem.vms = 1024 * 1024 * 200
fake_proc = MagicMock()
fake_proc.as_dict.return_value = {
"cpu_percent": 3.5,
"memory_info": fake_mem,
"num_threads": 4,
"status": "sleeping",
"create_time": time.time() - 300,
"cmdline": ["python", "-m", "hermes"],
}
fake_proc.num_fds.return_value = 12
mock_psutil.Process.return_value = fake_proc
plugin_mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test"
plugin_mod = sys.modules.get(plugin_mod_name)
if plugin_mod is not None:
monkeypatch.setattr(plugin_mod, "_psutil", mock_psutil)
else:
pytest.skip("plugin module not yet loaded")
r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect")
assert r.status_code == 200
body = r.json()
assert body["alive"] is True
assert body["pid"] == 12345
assert body["cpu_percent"] == 3.5
assert body["memory_rss_bytes"] == fake_mem.rss
assert body["num_threads"] == 4
assert body["status"] == "sleeping"