From 02efad704f5a462891ed6c18b305931bc16adfb8 Mon Sep 17 00:00:00 2001 From: Interstellar-code <33978413+Interstellar-code@users.noreply.github.com> Date: Mon, 18 May 2026 21:01:42 -0700 Subject: [PATCH] feat(kanban): worker visibility endpoints (workers/active, runs/{id}, inspect) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds three read-only endpoints to the kanban dashboard plugin so the SwitchUI workspace (and any other dashboard consumer) can track workers across tasks without N+1 round-trips through /tasks/{task_id}. - GET /workers/active Single SQL JOIN of task_runs + tasks where ended_at IS NULL, worker_pid IS NOT NULL, status='running'. Returns {workers: [...], count, checked_at}. - GET /runs/{run_id} Direct lookup of any task_run row by id. Reuses existing kanban_db.get_run() helper and _run_dict() serialiser. 404 when not found. Mirrors GET /tasks/{task_id} 404 shape. - GET /runs/{run_id}/inspect Live PID stats via psutil.Process.as_dict() — cpu_percent, memory_rss_bytes, memory_vms_bytes, num_threads, num_fds, status, create_time, cmdline. Short-circuits with alive:false when run has ended, has no worker_pid, the pid is gone, or psutil is unavailable. AccessDenied surfaces as alive:true with error rather than a 500. 11 new tests in tests/plugins/test_kanban_worker_runs.py cover the empty-board case, running-task case, ended-run filtering, missing-pid filtering, 404 paths, already-ended inspect, no-pid inspect, dead-pid inspect, and live-pid inspect (psutil mocked). All pass. Companion termination endpoint (POST /runs/{run_id}/terminate) is intentionally out of scope here — opening a separate issue first since the RBAC and dispatcher-mediated soft-cancel design needs maintainer input before code. Co-Authored-By: Claude Opus 4.7 (1M context) --- plugins/kanban/dashboard/plugin_api.py | 162 ++++++++++++ tests/plugins/test_kanban_worker_runs.py | 301 +++++++++++++++++++++++ 2 files changed, 463 insertions(+) create mode 100644 tests/plugins/test_kanban_worker_runs.py diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 44bb33960c4..2c5e595e567 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -1050,6 +1050,168 @@ def list_diagnostics( conn.close() + +# --------------------------------------------------------------------------- +# Worker visibility — cross-task active-worker list and per-run inspection +# --------------------------------------------------------------------------- + +try: + import psutil as _psutil +except ImportError: + _psutil = None # type: ignore[assignment] + + +@router.get("/workers/active") +def list_active_workers( + board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"), +): + """Return every currently-running worker on the board. + + A worker is a ``task_runs`` row whose ``ended_at`` is NULL and whose + ``worker_pid`` is non-NULL, belonging to a task with ``status='running'``. + + Returns ``{workers: [...], count: N, checked_at: }``. Each + worker entry carries enough context for the dashboard to link back to + its task without a second round-trip. + """ + board = _resolve_board(board) + conn = _conn(board=board) + try: + rows = conn.execute( + """ + SELECT + r.id AS run_id, + r.task_id, + t.title AS task_title, + t.status AS task_status, + t.assignee AS task_assignee, + r.profile, + r.worker_pid, + r.started_at, + r.claim_lock, + r.claim_expires, + r.last_heartbeat_at, + r.max_runtime_seconds + FROM task_runs r + JOIN tasks t ON t.id = r.task_id + WHERE r.ended_at IS NULL + AND r.worker_pid IS NOT NULL + AND t.status = 'running' + ORDER BY r.started_at ASC + """, + ).fetchall() + workers = [ + { + "run_id": row["run_id"], + "task_id": row["task_id"], + "task_title": row["task_title"], + "task_status": row["task_status"], + "task_assignee": row["task_assignee"], + "profile": row["profile"], + "worker_pid": row["worker_pid"], + "started_at": row["started_at"], + "claim_lock": row["claim_lock"], + "claim_expires": row["claim_expires"], + "last_heartbeat_at": row["last_heartbeat_at"], + "max_runtime_seconds": row["max_runtime_seconds"], + } + for row in rows + ] + return {"workers": workers, "count": len(workers), "checked_at": int(time.time())} + finally: + conn.close() + + +@router.get("/runs/{run_id}") +def get_run_endpoint( + run_id: int, + board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"), +): + """Direct lookup of a ``task_runs`` row by its integer id. + + Returns ``{run: {...}}`` using the same serialisation as the + per-task run history embedded in ``GET /tasks/{task_id}``. + 404 when no such run exists. + """ + board = _resolve_board(board) + conn = _conn(board=board) + try: + r = kanban_db.get_run(conn, run_id) + if r is None: + raise HTTPException(status_code=404, detail=f"run {run_id} not found") + return {"run": _run_dict(r)} + finally: + conn.close() + + +@router.get("/runs/{run_id}/inspect") +def inspect_run_endpoint( + run_id: int, + board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"), +): + """Live PID stats for a run's worker process via psutil. + + If the run has already ended, or has no recorded ``worker_pid``, + returns ``{alive: false}`` with a human-readable ``reason``. + + When the process is live, returns CPU, memory, thread count, fd count, + status, create_time, and cmdline. ``access_denied`` is set when the + OS refuses inspection rather than raising a 500. + + psutil availability: if psutil is not installed the endpoint still + works but ``alive`` is always returned as ``false`` with + ``reason="psutil not available"``. + """ + board = _resolve_board(board) + conn = _conn(board=board) + try: + r = kanban_db.get_run(conn, run_id) + if r is None: + raise HTTPException(status_code=404, detail=f"run {run_id} not found") + finally: + conn.close() + + if r.ended_at is not None: + return {"run_id": run_id, "alive": False, "reason": "run already ended"} + if r.worker_pid is None: + return {"run_id": run_id, "alive": False, "reason": "no worker_pid recorded"} + + pid = r.worker_pid + + if _psutil is None: + return {"run_id": run_id, "alive": False, "pid": pid, "reason": "psutil not available"} + + try: + proc = _psutil.Process(pid) + info = proc.as_dict(attrs=[ + "cpu_percent", "memory_info", "num_threads", + "status", "create_time", "cmdline", + ]) + # num_fds is POSIX-only; skip gracefully on Windows. + try: + num_fds = proc.num_fds() + except AttributeError: + num_fds = None + mem = info.get("memory_info") + return { + "run_id": run_id, + "alive": True, + "pid": pid, + "cpu_percent": info.get("cpu_percent"), + "memory_rss_bytes": mem.rss if mem else None, + "memory_vms_bytes": mem.vms if mem else None, + "num_threads": info.get("num_threads"), + "num_fds": num_fds, + "status": info.get("status"), + "create_time": info.get("create_time"), + "cmdline": info.get("cmdline"), + } + except _psutil.NoSuchProcess: + return {"run_id": run_id, "alive": False, "pid": pid, "reason": "process not found"} + except _psutil.AccessDenied: + return {"run_id": run_id, "alive": True, "pid": pid, "error": "access denied"} + + # --------------------------------------------------------------------------- # Recovery actions — reclaim a running claim, reassign to a new profile # --------------------------------------------------------------------------- diff --git a/tests/plugins/test_kanban_worker_runs.py b/tests/plugins/test_kanban_worker_runs.py new file mode 100644 index 00000000000..ba84d9ea9a8 --- /dev/null +++ b/tests/plugins/test_kanban_worker_runs.py @@ -0,0 +1,301 @@ +"""Tests for kanban worker/runs read endpoints. + +Covers: + GET /workers/active + GET /runs/{run_id} + GET /runs/{run_id}/inspect +""" + +from __future__ import annotations + +import importlib.util +import secrets +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from hermes_cli import kanban_db as kb + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _load_plugin_router(): + """Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router.""" + repo_root = Path(__file__).resolve().parents[2] + plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py" + assert plugin_file.exists(), f"plugin file missing: {plugin_file}" + + mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test" + # Re-use a cached module if already loaded to avoid duplicate-router issues. + if mod_name in sys.modules: + return sys.modules[mod_name].router + + spec = importlib.util.spec_from_file_location(mod_name, plugin_file) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + sys.modules[mod_name] = mod + spec.loader.exec_module(mod) + return mod.router + + +@pytest.fixture +def kanban_home(tmp_path, monkeypatch): + """Isolated HERMES_HOME with an empty kanban DB.""" + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + kb.init_db() + return home + + +@pytest.fixture +def client(kanban_home): + app = FastAPI() + app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban") + return TestClient(app) + + +def _insert_run(conn, task_id, *, worker_pid=None, ended_at=None): + """Insert a task_runs row directly (bypassing claim machinery) and return run_id.""" + lock = secrets.token_hex(8) + future = int(time.time()) + 3600 + cur = conn.execute( + "INSERT INTO task_runs " + "(task_id, status, claim_lock, claim_expires, worker_pid, started_at, ended_at) " + "VALUES (?, 'running', ?, ?, ?, ?, ?)", + (task_id, lock, future, worker_pid, int(time.time()), ended_at), + ) + conn.commit() + return cur.lastrowid + + +# --------------------------------------------------------------------------- +# GET /workers/active +# --------------------------------------------------------------------------- + +def test_workers_active_empty_board(client): + """Board with no running tasks returns an empty workers list.""" + r = client.get("/api/plugins/kanban/workers/active") + assert r.status_code == 200 + body = r.json() + assert body["workers"] == [] + assert body["count"] == 0 + assert "checked_at" in body + + +def test_workers_active_with_running_task(client): + """A running task with an open run row and worker_pid appears in the list.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="active-worker", assignee="alice") + conn.execute( + "UPDATE tasks SET status='running' WHERE id=?", (task_id,), + ) + _insert_run(conn, task_id, worker_pid=12345) + finally: + conn.close() + + r = client.get("/api/plugins/kanban/workers/active") + assert r.status_code == 200 + body = r.json() + assert body["count"] == 1 + w = body["workers"][0] + assert w["task_id"] == task_id + assert w["worker_pid"] == 12345 + assert w["task_status"] == "running" + assert w["task_title"] == "active-worker" + assert w["task_assignee"] == "alice" + + +def test_workers_active_excludes_ended_runs(client): + """Runs with ended_at set are excluded even if task is running.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="ended-run", assignee="bob") + conn.execute("UPDATE tasks SET status='running' WHERE id=?", (task_id,)) + _insert_run(conn, task_id, worker_pid=99999, ended_at=int(time.time()) - 60) + finally: + conn.close() + + r = client.get("/api/plugins/kanban/workers/active") + assert r.status_code == 200 + assert r.json()["count"] == 0 + + +def test_workers_active_excludes_runs_without_pid(client): + """Runs with no worker_pid are not considered active workers.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="no-pid", assignee="carol") + conn.execute("UPDATE tasks SET status='running' WHERE id=?", (task_id,)) + _insert_run(conn, task_id, worker_pid=None) + finally: + conn.close() + + r = client.get("/api/plugins/kanban/workers/active") + assert r.status_code == 200 + assert r.json()["count"] == 0 + + +# --------------------------------------------------------------------------- +# GET /runs/{run_id} +# --------------------------------------------------------------------------- + +def test_get_run_404_unknown_id(client): + """Non-existent run_id returns 404.""" + r = client.get("/api/plugins/kanban/runs/999999") + assert r.status_code == 404 + assert "999999" in r.json()["detail"] + + +def test_get_run_ok(client): + """Existing run row returns 200 with expected shape.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="run-lookup", assignee="dave") + run_id = _insert_run(conn, task_id, worker_pid=55555) + finally: + conn.close() + + r = client.get(f"/api/plugins/kanban/runs/{run_id}") + assert r.status_code == 200 + body = r.json() + assert "run" in body + run = body["run"] + assert run["id"] == run_id + assert run["task_id"] == task_id + assert run["worker_pid"] == 55555 + assert run["ended_at"] is None + + +# --------------------------------------------------------------------------- +# GET /runs/{run_id}/inspect +# --------------------------------------------------------------------------- + +def test_inspect_run_404(client): + """Non-existent run_id returns 404.""" + r = client.get("/api/plugins/kanban/runs/888888/inspect") + assert r.status_code == 404 + + +def test_inspect_run_already_ended(client): + """Run with ended_at set returns alive=false with reason.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="ended", assignee="eve") + run_id = _insert_run(conn, task_id, worker_pid=11111, ended_at=int(time.time()) - 10) + finally: + conn.close() + + r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect") + assert r.status_code == 200 + body = r.json() + assert body["alive"] is False + assert "ended" in body["reason"] + + +def test_inspect_run_no_pid(client): + """Run with no worker_pid returns alive=false with reason.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="no-pid-inspect", assignee="frank") + run_id = _insert_run(conn, task_id, worker_pid=None) + finally: + conn.close() + + r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect") + assert r.status_code == 200 + body = r.json() + assert body["alive"] is False + assert "worker_pid" in body["reason"] + + +def test_inspect_run_dead_pid(client, monkeypatch): + """Run with a non-existent PID returns alive=false via psutil.NoSuchProcess.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="dead-pid", assignee="grace") + run_id = _insert_run(conn, task_id, worker_pid=999999) + finally: + conn.close() + + # Mock psutil to raise NoSuchProcess for any PID. + mock_psutil = MagicMock() + mock_psutil.NoSuchProcess = Exception + mock_psutil.AccessDenied = PermissionError + + def _raise_no_such(*args, **kwargs): + raise mock_psutil.NoSuchProcess("no such process") + + mock_psutil.Process = _raise_no_such + + # Patch the module-level _psutil in the loaded plugin module. + plugin_mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test" + plugin_mod = sys.modules.get(plugin_mod_name) + if plugin_mod is not None: + monkeypatch.setattr(plugin_mod, "_psutil", mock_psutil) + else: + pytest.skip("plugin module not yet loaded") + + r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect") + assert r.status_code == 200 + body = r.json() + assert body["alive"] is False + assert body["pid"] == 999999 + assert "not found" in body["reason"] + + +def test_inspect_run_live_pid(client, monkeypatch): + """Run with a live PID returns alive=true with psutil fields.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="live-pid", assignee="heidi") + run_id = _insert_run(conn, task_id, worker_pid=12345) + finally: + conn.close() + + # Build a realistic mock psutil. + mock_psutil = MagicMock() + mock_psutil.NoSuchProcess = type("NoSuchProcess", (Exception,), {}) + mock_psutil.AccessDenied = type("AccessDenied", (Exception,), {}) + + fake_mem = MagicMock() + fake_mem.rss = 1024 * 1024 * 50 # 50 MB + fake_mem.vms = 1024 * 1024 * 200 + + fake_proc = MagicMock() + fake_proc.as_dict.return_value = { + "cpu_percent": 3.5, + "memory_info": fake_mem, + "num_threads": 4, + "status": "sleeping", + "create_time": time.time() - 300, + "cmdline": ["python", "-m", "hermes"], + } + fake_proc.num_fds.return_value = 12 + mock_psutil.Process.return_value = fake_proc + + plugin_mod_name = "hermes_dashboard_plugin_kanban_worker_runs_test" + plugin_mod = sys.modules.get(plugin_mod_name) + if plugin_mod is not None: + monkeypatch.setattr(plugin_mod, "_psutil", mock_psutil) + else: + pytest.skip("plugin module not yet loaded") + + r = client.get(f"/api/plugins/kanban/runs/{run_id}/inspect") + assert r.status_code == 200 + body = r.json() + assert body["alive"] is True + assert body["pid"] == 12345 + assert body["cpu_percent"] == 3.5 + assert body["memory_rss_bytes"] == fake_mem.rss + assert body["num_threads"] == 4 + assert body["status"] == "sleeping"