mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-07 02:51:50 +00:00
feat(kanban): multi-project boards — one install, many kanbans (#19653)
Adds first-class board support to kanban so users can separate unrelated
streams of work (projects, repos, domains) into isolated queues. Single-
project users stay on the 'default' board and see no UI change.
Isolation model
---------------
- Each board is a directory at `~/.hermes/kanban/boards/<slug>/` with
its own `kanban.db`, `workspaces/`, and `logs/`. The 'default' board
keeps its legacy path (`~/.hermes/kanban.db`) for back-compat — fresh
installs and pre-boards users get zero migration.
- Workers spawned by the dispatcher have `HERMES_KANBAN_BOARD` pinned in
their env alongside the existing `HERMES_KANBAN_DB` /
`HERMES_KANBAN_WORKSPACES_ROOT` pins, so workers physically cannot see
other boards' tasks.
- The gateway's single dispatcher loop now sweeps every board per tick;
per-tick cost is a few extra filesystem stats.
- CAS concurrency guarantees are preserved per-board (each board is its
own SQLite DB, same WAL+IMMEDIATE machinery as before).
CLI
---
hermes kanban boards list|create|switch|show|rename|rm
hermes kanban --board <slug> <any-subcommand>
Board resolution order: `--board` flag → `HERMES_KANBAN_BOARD` env →
`~/.hermes/kanban/current` file → `default`. Slug validation is strict:
lowercase alphanumerics + hyphens + underscores, 1-64 chars, starts with
alphanumeric. Uppercase is auto-downcased; slashes / dots / `..` /
control chars are rejected so boards can't name their way out of the
boards/ directory.
Passive discoverability: when more than one board exists, `hermes kanban
list` prints a one-line header ("Board: foo (2 other boards …)") so
users who stumble across multi-project never have to hunt for the
feature. Invisible for single-board installs.
Dashboard
---------
- New `BoardSwitcher` component at the top of the Kanban tab: dropdown
with all boards + task counts, `+ New board` button, `Archive`
button (non-default only). Hidden entirely when only `default` exists
and is empty — single-project users never see it.
- New `NewBoardDialog` modal: slug / display name / description / icon
+ "switch to this board after creating" checkbox.
- Selected board persists to `localStorage` so browser users don't
shift the CLI's active board out from under a terminal they left open.
- New `?board=<slug>` query param on every existing endpoint plus a
new `/boards` CRUD surface (`GET /boards`, `POST /boards`,
`PATCH /boards/<slug>`, `DELETE /boards/<slug>`,
`POST /boards/<slug>/switch`).
- Events WebSocket is pinned to a board at connection time; switching
opens a fresh WS against the new board.
Also fixes a pre-existing bug in the plugin's tenant / assignee
filters: the SDK's `Select` uses `onValueChange(value)`, not
native `onChange(event)`, so those filters silently didn't work.
New `selectChangeHandler` helper wires both signatures.
Tests
-----
49 new tests in `tests/hermes_cli/test_kanban_boards.py` covering:
slug validation (valid / invalid / auto-downcase), path resolution
(default = legacy path, named = `boards/<slug>/`, env var override),
current-board resolution chain (env > file > default), board CRUD +
archive / hard-delete, per-board connection isolation (tasks don't
leak), worker spawn env injection (`HERMES_KANBAN_BOARD`,
`HERMES_KANBAN_DB`, `HERMES_KANBAN_WORKSPACES_ROOT` all point at the
right board), and end-to-end CLI surface.
Regression surface: all 264 pre-existing kanban tests continue to pass.
Live-tested via the dashboard: created 3 boards (default,
hermes-agent, atm10-server), created tasks on each via both CLI
(`--board <slug> create`) and dashboard (inline create on the Ready
column), confirmed zero cross-board leakage, confirmed `BoardSwitcher`
+ `NewBoardDialog` work end-to-end in the browser.
This commit is contained in:
parent
135b4c8b35
commit
5ec6baa400
8 changed files with 2191 additions and 212 deletions
212
gateway/run.py
212
gateway/run.py
|
|
@ -3277,6 +3277,11 @@ class GatewayRunner:
|
|||
Runs in the gateway event loop; all SQLite work is pushed to a
|
||||
thread via ``asyncio.to_thread`` so the loop never blocks on the
|
||||
WAL lock. Failures in one tick don't stop subsequent ticks.
|
||||
|
||||
**Multi-board:** iterates every board discovered on disk per
|
||||
tick. Subscriptions live inside each board's own DB and cannot
|
||||
cross boards, so delivery semantics are unchanged — this is
|
||||
purely a fan-out of the single-DB poll.
|
||||
"""
|
||||
from gateway.config import Platform as _Platform
|
||||
try:
|
||||
|
|
@ -3309,40 +3314,54 @@ class GatewayRunner:
|
|||
while self._running:
|
||||
try:
|
||||
def _collect():
|
||||
conn = _kb.connect()
|
||||
deliveries: list[dict] = []
|
||||
# Enumerate every board on disk. Cheap: a few
|
||||
# directory stat calls per tick. Missing/empty
|
||||
# boards are silently skipped.
|
||||
try:
|
||||
_kb.init_db() # idempotent; handles first-run
|
||||
boards = _kb.list_boards(include_archived=False)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
subs = _kb.list_notify_subs(conn)
|
||||
deliveries: list[dict] = []
|
||||
for sub in subs:
|
||||
cursor, events = _kb.unseen_events_for_sub(
|
||||
conn,
|
||||
task_id=sub["task_id"],
|
||||
platform=sub["platform"],
|
||||
chat_id=sub["chat_id"],
|
||||
thread_id=sub.get("thread_id") or "",
|
||||
kinds=TERMINAL_KINDS,
|
||||
)
|
||||
if not events:
|
||||
continue
|
||||
task = _kb.get_task(conn, sub["task_id"])
|
||||
deliveries.append({
|
||||
"sub": sub,
|
||||
"cursor": cursor,
|
||||
"events": events,
|
||||
"task": task,
|
||||
})
|
||||
return deliveries
|
||||
finally:
|
||||
conn.close()
|
||||
boards = [_kb.read_board_metadata(_kb.DEFAULT_BOARD)]
|
||||
for board_meta in boards:
|
||||
slug = board_meta.get("slug") or _kb.DEFAULT_BOARD
|
||||
try:
|
||||
conn = _kb.connect(board=slug)
|
||||
except Exception:
|
||||
continue
|
||||
try:
|
||||
try:
|
||||
_kb.init_db(board=slug) # idempotent; handles first-run
|
||||
except Exception:
|
||||
pass
|
||||
subs = _kb.list_notify_subs(conn)
|
||||
for sub in subs:
|
||||
cursor, events = _kb.unseen_events_for_sub(
|
||||
conn,
|
||||
task_id=sub["task_id"],
|
||||
platform=sub["platform"],
|
||||
chat_id=sub["chat_id"],
|
||||
thread_id=sub.get("thread_id") or "",
|
||||
kinds=TERMINAL_KINDS,
|
||||
)
|
||||
if not events:
|
||||
continue
|
||||
task = _kb.get_task(conn, sub["task_id"])
|
||||
deliveries.append({
|
||||
"sub": sub,
|
||||
"cursor": cursor,
|
||||
"events": events,
|
||||
"task": task,
|
||||
"board": slug,
|
||||
})
|
||||
finally:
|
||||
conn.close()
|
||||
return deliveries
|
||||
|
||||
deliveries = await asyncio.to_thread(_collect)
|
||||
for d in deliveries:
|
||||
sub = d["sub"]
|
||||
task = d["task"]
|
||||
board_slug = d.get("board")
|
||||
platform_str = (sub["platform"] or "").lower()
|
||||
try:
|
||||
plat = _Platform(platform_str)
|
||||
|
|
@ -3350,7 +3369,7 @@ class GatewayRunner:
|
|||
# Unknown platform string; skip and advance cursor so
|
||||
# we don't replay forever.
|
||||
await asyncio.to_thread(
|
||||
self._kanban_advance, sub, d["cursor"],
|
||||
self._kanban_advance, sub, d["cursor"], board_slug,
|
||||
)
|
||||
continue
|
||||
adapter = self.adapters.get(plat)
|
||||
|
|
@ -3440,14 +3459,14 @@ class GatewayRunner:
|
|||
"%s on %s after %d consecutive send failures",
|
||||
sub["task_id"], platform_str, fails,
|
||||
)
|
||||
await asyncio.to_thread(self._kanban_unsub, sub)
|
||||
await asyncio.to_thread(self._kanban_unsub, sub, board_slug)
|
||||
sub_fail_counts.pop(sub_key, None)
|
||||
# Don't advance cursor on send failure — retry next tick.
|
||||
break
|
||||
else:
|
||||
# All events delivered; advance cursor + maybe unsub.
|
||||
await asyncio.to_thread(
|
||||
self._kanban_advance, sub, d["cursor"],
|
||||
self._kanban_advance, sub, d["cursor"], board_slug,
|
||||
)
|
||||
# Unsubscribe when the LAST delivered event is a
|
||||
# terminal kind (the task hit a "no further updates"
|
||||
|
|
@ -3459,7 +3478,7 @@ class GatewayRunner:
|
|||
event_terminal = last_kind in TERMINAL_EVENT_KINDS
|
||||
if task_terminal or event_terminal:
|
||||
await asyncio.to_thread(
|
||||
self._kanban_unsub, sub,
|
||||
self._kanban_unsub, sub, board_slug,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("kanban notifier tick failed: %s", exc)
|
||||
|
|
@ -3469,10 +3488,16 @@ class GatewayRunner:
|
|||
return
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def _kanban_advance(self, sub: dict, cursor: int) -> None:
|
||||
"""Sync helper: advance a subscription's cursor. Runs in to_thread."""
|
||||
def _kanban_advance(
|
||||
self, sub: dict, cursor: int, board: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Sync helper: advance a subscription's cursor. Runs in to_thread.
|
||||
|
||||
``board`` scopes the DB connection to the board that owns this
|
||||
subscription. Unsub cursors in one board can't touch another's.
|
||||
"""
|
||||
from hermes_cli import kanban_db as _kb
|
||||
conn = _kb.connect()
|
||||
conn = _kb.connect(board=board)
|
||||
try:
|
||||
_kb.advance_notify_cursor(
|
||||
conn,
|
||||
|
|
@ -3485,9 +3510,9 @@ class GatewayRunner:
|
|||
finally:
|
||||
conn.close()
|
||||
|
||||
def _kanban_unsub(self, sub: dict) -> None:
|
||||
def _kanban_unsub(self, sub: dict, board: Optional[str] = None) -> None:
|
||||
from hermes_cli import kanban_db as _kb
|
||||
conn = _kb.connect()
|
||||
conn = _kb.connect(board=board)
|
||||
try:
|
||||
_kb.remove_notify_sub(
|
||||
conn,
|
||||
|
|
@ -3565,20 +3590,25 @@ class GatewayRunner:
|
|||
bad_ticks = 0
|
||||
last_warn_at = 0
|
||||
|
||||
def _tick_once() -> "Optional[object]":
|
||||
"""Run one dispatch_once; return result or None on error.
|
||||
def _tick_once_for_board(slug: str) -> "Optional[object]":
|
||||
"""Run one dispatch_once for a specific board.
|
||||
|
||||
Runs in a worker thread via `asyncio.to_thread`."""
|
||||
Runs in a worker thread via `asyncio.to_thread`. `board=slug`
|
||||
is passed through `dispatch_once` so `resolve_workspace` and
|
||||
`_default_spawn` see the right paths. The per-board DB is
|
||||
opened explicitly so concurrent boards never share a
|
||||
connection handle or accidentally claim across each other.
|
||||
"""
|
||||
conn = None
|
||||
try:
|
||||
conn = _kb.connect()
|
||||
conn = _kb.connect(board=slug)
|
||||
try:
|
||||
_kb.init_db() # idempotent, handles first-run
|
||||
_kb.init_db(board=slug) # idempotent, handles first-run
|
||||
except Exception:
|
||||
pass
|
||||
return _kb.dispatch_once(conn)
|
||||
return _kb.dispatch_once(conn, board=slug)
|
||||
except Exception:
|
||||
logger.exception("kanban dispatcher: tick failed")
|
||||
logger.exception("kanban dispatcher: tick failed on board %s", slug)
|
||||
return None
|
||||
finally:
|
||||
if conn is not None:
|
||||
|
|
@ -3587,49 +3617,77 @@ class GatewayRunner:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
def _ready_nonempty() -> bool:
|
||||
"""Cheap probe: is there at least one ready+assigned+unclaimed task?"""
|
||||
conn = None
|
||||
def _tick_once() -> "list[tuple[str, Optional[object]]]":
|
||||
"""Run one dispatch_once per board. Returns (slug, result) pairs.
|
||||
|
||||
Enumerating boards on every tick keeps the dispatcher honest
|
||||
when users create a new board mid-run: no restart required,
|
||||
the next tick picks it up automatically.
|
||||
"""
|
||||
try:
|
||||
conn = _kb.connect()
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks "
|
||||
"WHERE status = 'ready' AND assignee IS NOT NULL "
|
||||
" AND claim_lock IS NULL LIMIT 1"
|
||||
).fetchone()
|
||||
return row is not None
|
||||
boards = _kb.list_boards(include_archived=False)
|
||||
except Exception:
|
||||
return False
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
boards = [_kb.read_board_metadata(_kb.DEFAULT_BOARD)]
|
||||
out: list[tuple[str, "Optional[object]"]] = []
|
||||
for b in boards:
|
||||
slug = b.get("slug") or _kb.DEFAULT_BOARD
|
||||
out.append((slug, _tick_once_for_board(slug)))
|
||||
return out
|
||||
|
||||
def _ready_nonempty() -> bool:
|
||||
"""Cheap probe: is there a ready+assigned+unclaimed task on ANY board?"""
|
||||
try:
|
||||
boards = _kb.list_boards(include_archived=False)
|
||||
except Exception:
|
||||
boards = [_kb.read_board_metadata(_kb.DEFAULT_BOARD)]
|
||||
for b in boards:
|
||||
slug = b.get("slug") or _kb.DEFAULT_BOARD
|
||||
conn = None
|
||||
try:
|
||||
conn = _kb.connect(board=slug)
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks "
|
||||
"WHERE status = 'ready' AND assignee IS NOT NULL "
|
||||
" AND claim_lock IS NULL LIMIT 1"
|
||||
).fetchone()
|
||||
if row is not None:
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
"kanban dispatcher: embedded in gateway (interval=%.1fs)", interval
|
||||
)
|
||||
while self._running:
|
||||
try:
|
||||
res = await asyncio.to_thread(_tick_once)
|
||||
if res is not None and getattr(res, "spawned", None):
|
||||
# Quiet by default — only log when something actually
|
||||
# happened, so an idle gateway stays silent.
|
||||
logger.info(
|
||||
"kanban dispatcher: tick spawned=%d reclaimed=%d "
|
||||
"crashed=%d timed_out=%d promoted=%d auto_blocked=%d",
|
||||
len(res.spawned),
|
||||
res.reclaimed,
|
||||
len(res.crashed) if hasattr(res.crashed, "__len__") else 0,
|
||||
len(res.timed_out) if hasattr(res.timed_out, "__len__") else 0,
|
||||
res.promoted,
|
||||
len(res.auto_blocked) if hasattr(res.auto_blocked, "__len__") else 0,
|
||||
)
|
||||
# Health telemetry
|
||||
results = await asyncio.to_thread(_tick_once)
|
||||
any_spawned = False
|
||||
for slug, res in (results or []):
|
||||
if res is not None and getattr(res, "spawned", None):
|
||||
any_spawned = True
|
||||
# Quiet by default — only log when something actually
|
||||
# happened, so an idle gateway stays silent.
|
||||
logger.info(
|
||||
"kanban dispatcher [%s]: spawned=%d reclaimed=%d "
|
||||
"crashed=%d timed_out=%d promoted=%d auto_blocked=%d",
|
||||
slug,
|
||||
len(res.spawned),
|
||||
res.reclaimed,
|
||||
len(res.crashed) if hasattr(res.crashed, "__len__") else 0,
|
||||
len(res.timed_out) if hasattr(res.timed_out, "__len__") else 0,
|
||||
res.promoted,
|
||||
len(res.auto_blocked) if hasattr(res.auto_blocked, "__len__") else 0,
|
||||
)
|
||||
# Health telemetry (aggregate across boards)
|
||||
ready_pending = await asyncio.to_thread(_ready_nonempty)
|
||||
spawned_any = bool(res and getattr(res, "spawned", None))
|
||||
if ready_pending and not spawned_any:
|
||||
if ready_pending and not any_spawned:
|
||||
bad_ticks += 1
|
||||
else:
|
||||
bad_ticks = 0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue